import bz2
import csv
import os
import asyncio
from io import TextIOWrapper
import random

import numpy as np
import pytest
import pytest_asyncio
from redis import ResponseError
import redis.asyncio as redis
import redis.commands.search.aggregation as aggregations
from redis.commands.search.hybrid_query import (
    CombinationMethods,
    CombineResultsMethod,
    HybridCursorQuery,
    HybridFilter,
    HybridPostProcessingConfig,
    HybridQuery,
    HybridSearchQuery,
    HybridVsimQuery,
    VectorSearchMethods,
)
from redis.commands.search.hybrid_result import HybridCursorResult
import redis.commands.search.reducers as reducers
from redis.commands.search import AsyncSearch
from redis.commands.search.field import (
    GeoField,
    NumericField,
    TagField,
    TextField,
    VectorField,
)
from redis.commands.search.index_definition import IndexDefinition, IndexType
from redis.commands.search.query import GeoFilter, NumericFilter, Query, SortbyField
from redis.commands.search.result import Result
from redis.commands.search.suggestion import Suggestion
from redis.utils import safe_str
from tests.conftest import (
    is_resp2_connection,
    skip_if_redis_enterprise,
    skip_if_resp_version,
    skip_if_server_version_gte,
    skip_if_server_version_lt,
    skip_ifmodversion_lt,
)

WILL_PLAY_TEXT = os.path.abspath(
    os.path.join(os.path.dirname(__file__), "testdata", "will_play_text.csv.bz2")
)

TITLES_CSV = os.path.abspath(
    os.path.join(os.path.dirname(__file__), "testdata", "titles.csv")
)


class AsyncSearchTestsBase:
    @pytest_asyncio.fixture()
    async def decoded_r(self, create_redis, stack_url):
        return await create_redis(decode_responses=True, url=stack_url)

    @staticmethod
    async def waitForIndex(env, idx, timeout=None):
        delay = 0.1
        while True:
            try:
                res = await env.execute_command("FT.INFO", idx)
                if int(res[res.index("indexing") + 1]) == 0:
                    break
            except ValueError:
                break
            except AttributeError:
                try:
                    if int(res["indexing"]) == 0:
                        break
                except ValueError:
                    break
            except ResponseError:
                # index doesn't exist yet
                # continue to sleep and try again
                pass

            await asyncio.sleep(delay)
            if timeout is not None:
                timeout -= delay
                if timeout <= 0:
                    break

    @staticmethod
    def getClient(decoded_r: redis.Redis):
        """
        Gets a client client attached to an index name which is ready to be
        created
        """
        return decoded_r

    @staticmethod
    async def createIndex(decoded_r, num_docs=100, definition=None):
        try:
            await decoded_r.create_index(
                (
                    TextField("play", weight=5.0),
                    TextField("txt"),
                    NumericField("chapter"),
                ),
                definition=definition,
            )
        except redis.ResponseError:
            await decoded_r.dropindex(delete_documents=True)
            return await AsyncSearchTestsBase.createIndex(
                decoded_r, num_docs=num_docs, definition=definition
            )

        chapters = {}
        bzfp = TextIOWrapper(bz2.BZ2File(WILL_PLAY_TEXT), encoding="utf8")

        r = csv.reader(bzfp, delimiter=";")
        for n, line in enumerate(r):
            play, chapter, _, text = line[1], line[2], line[4], line[5]

            key = f"{play}:{chapter}".lower()
            d = chapters.setdefault(key, {})
            d["play"] = play
            d["txt"] = d.get("txt", "") + " " + text
            d["chapter"] = int(chapter or 0)
            if len(chapters) == num_docs:
                break

        indexer = decoded_r.batch_indexer(chunk_size=50)
        assert isinstance(indexer, AsyncSearch.BatchIndexer)
        assert 50 == indexer.chunk_size

        for key, doc in chapters.items():
            await indexer.client.client.hset(key, mapping=doc)
        await indexer.commit()


class TestBaseSearchFunctionality(AsyncSearchTestsBase):
    @pytest.mark.redismod
    async def test_client(self, decoded_r: redis.Redis):
        num_docs = 500
        await self.createIndex(decoded_r.ft(), num_docs=num_docs)
        await self.waitForIndex(decoded_r, "idx")
        # verify info
        info = await decoded_r.ft().info()
        for k in [
            "index_name",
            "index_options",
            "attributes",
            "num_docs",
            "max_doc_id",
            "num_terms",
            "num_records",
            "inverted_sz_mb",
            "offset_vectors_sz_mb",
            "doc_table_size_mb",
            "key_table_size_mb",
            "records_per_doc_avg",
            "bytes_per_record_avg",
            "offsets_per_term_avg",
            "offset_bits_per_record_avg",
        ]:
            assert k in info

        assert decoded_r.ft().index_name == info["index_name"]
        assert num_docs == int(info["num_docs"])

        res = await decoded_r.ft().search("henry iv")
        if is_resp2_connection(decoded_r):
            assert isinstance(res, Result)
            assert 225 == res.total
            assert 10 == len(res.docs)
            assert res.duration > 0

            for doc in res.docs:
                assert doc.id
                assert doc.play == "Henry IV"
                assert len(doc.txt) > 0

            # test no content
            res = await decoded_r.ft().search(Query("king").no_content())
            assert 194 == res.total
            assert 10 == len(res.docs)
            for doc in res.docs:
                assert "txt" not in doc.__dict__
                assert "play" not in doc.__dict__

            # test verbatim vs no verbatim
            total = (await decoded_r.ft().search(Query("kings").no_content())).total
            vtotal = (
                await decoded_r.ft().search(Query("kings").no_content().verbatim())
            ).total
            assert total > vtotal

            # test in fields
            txt_total = (
                await decoded_r.ft().search(
                    Query("henry").no_content().limit_fields("txt")
                )
            ).total
            play_total = (
                await decoded_r.ft().search(
                    Query("henry").no_content().limit_fields("play")
                )
            ).total
            both_total = (
                await decoded_r.ft().search(
                    Query("henry").no_content().limit_fields("play", "txt")
                )
            ).total
            assert 129 == txt_total
            assert 494 == play_total
            assert 494 == both_total

            # test load_document
            doc = await decoded_r.ft().load_document("henry vi part 3:62")
            assert doc is not None
            assert "henry vi part 3:62" == doc.id
            assert doc.play == "Henry VI Part 3"
            assert len(doc.txt) > 0

            # test in-keys
            ids = [x.id for x in (await decoded_r.ft().search(Query("henry"))).docs]
            assert 10 == len(ids)
            subset = ids[:5]
            docs = await decoded_r.ft().search(Query("henry").limit_ids(*subset))
            assert len(subset) == docs.total
            ids = [x.id for x in docs.docs]
            assert set(ids) == set(subset)

            # test slop and in order
            assert 193 == (await decoded_r.ft().search(Query("henry king"))).total
            assert (
                3
                == (
                    await decoded_r.ft().search(Query("henry king").slop(0).in_order())
                ).total
            )
            assert (
                52
                == (
                    await decoded_r.ft().search(Query("king henry").slop(0).in_order())
                ).total
            )
            assert (
                53 == (await decoded_r.ft().search(Query("henry king").slop(0))).total
            )
            assert (
                167
                == (await decoded_r.ft().search(Query("henry king").slop(100))).total
            )

            # test delete document
            await decoded_r.hset("doc-5ghs2", mapping={"play": "Death of a Salesman"})
            res = await decoded_r.ft().search(Query("death of a salesman"))
            assert 1 == res.total

            assert 1 == await decoded_r.ft().delete_document("doc-5ghs2")
            res = await decoded_r.ft().search(Query("death of a salesman"))
            assert 0 == res.total
            assert 0 == await decoded_r.ft().delete_document("doc-5ghs2")

            await decoded_r.hset("doc-5ghs2", mapping={"play": "Death of a Salesman"})
            res = await decoded_r.ft().search(Query("death of a salesman"))
            assert 1 == res.total
            await decoded_r.ft().delete_document("doc-5ghs2")
        else:
            assert isinstance(res, dict)
            assert 225 == res["total_results"]
            assert 10 == len(res["results"])

            for doc in res["results"]:
                assert doc["id"]
                assert doc["extra_attributes"]["play"] == "Henry IV"
                assert len(doc["extra_attributes"]["txt"]) > 0

            # test no content
            res = await decoded_r.ft().search(Query("king").no_content())
            assert 194 == res["total_results"]
            assert 10 == len(res["results"])
            for doc in res["results"]:
                assert "extra_attributes" not in doc.keys()

            # test verbatim vs no verbatim
            total = (await decoded_r.ft().search(Query("kings").no_content()))[
                "total_results"
            ]
            vtotal = (
                await decoded_r.ft().search(Query("kings").no_content().verbatim())
            )["total_results"]
            assert total > vtotal

            # test in fields
            txt_total = (
                await decoded_r.ft().search(
                    Query("henry").no_content().limit_fields("txt")
                )
            )["total_results"]
            play_total = (
                await decoded_r.ft().search(
                    Query("henry").no_content().limit_fields("play")
                )
            )["total_results"]
            both_total = (
                await decoded_r.ft().search(
                    Query("henry").no_content().limit_fields("play", "txt")
                )
            )["total_results"]
            assert 129 == txt_total
            assert 494 == play_total
            assert 494 == both_total

            # test load_document
            doc = await decoded_r.ft().load_document("henry vi part 3:62")
            assert doc is not None
            assert "henry vi part 3:62" == doc.id
            assert doc.play == "Henry VI Part 3"
            assert len(doc.txt) > 0

            # test in-keys
            ids = [
                x["id"]
                for x in (await decoded_r.ft().search(Query("henry")))["results"]
            ]
            assert 10 == len(ids)
            subset = ids[:5]
            docs = await decoded_r.ft().search(Query("henry").limit_ids(*subset))
            assert len(subset) == docs["total_results"]
            ids = [x["id"] for x in docs["results"]]
            assert set(ids) == set(subset)

            # test slop and in order
            assert (
                193
                == (await decoded_r.ft().search(Query("henry king")))["total_results"]
            )
            assert (
                3
                == (
                    await decoded_r.ft().search(Query("henry king").slop(0).in_order())
                )["total_results"]
            )
            assert (
                52
                == (
                    await decoded_r.ft().search(Query("king henry").slop(0).in_order())
                )["total_results"]
            )
            assert (
                53
                == (await decoded_r.ft().search(Query("henry king").slop(0)))[
                    "total_results"
                ]
            )
            assert (
                167
                == (await decoded_r.ft().search(Query("henry king").slop(100)))[
                    "total_results"
                ]
            )

            # test delete document
            await decoded_r.hset("doc-5ghs2", mapping={"play": "Death of a Salesman"})
            res = await decoded_r.ft().search(Query("death of a salesman"))
            assert 1 == res["total_results"]

            assert 1 == await decoded_r.ft().delete_document("doc-5ghs2")
            res = await decoded_r.ft().search(Query("death of a salesman"))
            assert 0 == res["total_results"]
            assert 0 == await decoded_r.ft().delete_document("doc-5ghs2")

            await decoded_r.hset("doc-5ghs2", mapping={"play": "Death of a Salesman"})
            res = await decoded_r.ft().search(Query("death of a salesman"))
            assert 1 == res["total_results"]
            await decoded_r.ft().delete_document("doc-5ghs2")

    @pytest.mark.redismod
    @pytest.mark.onlynoncluster
    @skip_if_server_version_gte("7.9.0")
    async def test_scores(self, decoded_r: redis.Redis):
        await decoded_r.ft().create_index((TextField("txt"),))

        await decoded_r.hset("doc1", mapping={"txt": "foo baz"})
        await decoded_r.hset("doc2", mapping={"txt": "foo bar"})

        q = Query("foo ~bar").with_scores()
        res = await decoded_r.ft().search(q)
        if is_resp2_connection(decoded_r):
            assert 2 == res.total
            assert "doc2" == res.docs[0].id
            assert 3.0 == res.docs[0].score
            assert "doc1" == res.docs[1].id
        else:
            assert 2 == res["total_results"]
            assert "doc2" == res["results"][0]["id"]
            assert 3.0 == res["results"][0]["score"]
            assert "doc1" == res["results"][1]["id"]

    @pytest.mark.redismod
    @pytest.mark.onlynoncluster
    @skip_if_server_version_lt("7.9.0")
    async def test_scores_with_new_default_scorer(self, decoded_r: redis.Redis):
        await decoded_r.ft().create_index((TextField("txt"),))

        await decoded_r.hset("doc1", mapping={"txt": "foo baz"})
        await decoded_r.hset("doc2", mapping={"txt": "foo bar"})

        q = Query("foo ~bar").with_scores()
        res = await decoded_r.ft().search(q)
        if is_resp2_connection(decoded_r):
            assert 2 == res.total
            assert "doc2" == res.docs[0].id
            assert 0.87 == pytest.approx(res.docs[0].score, 0.01)
            assert "doc1" == res.docs[1].id
        else:
            assert 2 == res["total_results"]
            assert "doc2" == res["results"][0]["id"]
            assert 0.87 == pytest.approx(res["results"][0]["score"], 0.01)
            assert "doc1" == res["results"][1]["id"]

    @pytest.mark.redismod
    async def test_stopwords(self, decoded_r: redis.Redis):
        stopwords = ["foo", "bar", "baz"]
        await decoded_r.ft().create_index((TextField("txt"),), stopwords=stopwords)
        await decoded_r.hset("doc1", mapping={"txt": "foo bar"})
        await decoded_r.hset("doc2", mapping={"txt": "hello world"})
        await self.waitForIndex(decoded_r, "idx")

        q1 = Query("foo bar").no_content()
        q2 = Query("foo bar hello world").no_content()
        res1, res2 = await decoded_r.ft().search(q1), await decoded_r.ft().search(q2)
        if is_resp2_connection(decoded_r):
            assert 0 == res1.total
            assert 1 == res2.total
        else:
            assert 0 == res1["total_results"]
            assert 1 == res2["total_results"]

    @pytest.mark.redismod
    async def test_filters(self, decoded_r: redis.Redis):
        await decoded_r.ft().create_index(
            (TextField("txt"), NumericField("num"), GeoField("loc"))
        )
        await decoded_r.hset(
            "doc1", mapping={"txt": "foo bar", "num": 3.141, "loc": "-0.441,51.458"}
        )
        await decoded_r.hset(
            "doc2", mapping={"txt": "foo baz", "num": 2, "loc": "-0.1,51.2"}
        )

        await self.waitForIndex(decoded_r, "idx")
        # Test numerical filter
        q1 = Query("foo").add_filter(NumericFilter("num", 0, 2)).no_content()
        q2 = (
            Query("foo")
            .add_filter(NumericFilter("num", 2, NumericFilter.INF, minExclusive=True))
            .no_content()
        )
        res1, res2 = await decoded_r.ft().search(q1), await decoded_r.ft().search(q2)

        if is_resp2_connection(decoded_r):
            assert 1 == res1.total
            assert 1 == res2.total
            assert "doc2" == res1.docs[0].id
            assert "doc1" == res2.docs[0].id
        else:
            assert 1 == res1["total_results"]
            assert 1 == res2["total_results"]
            assert "doc2" == res1["results"][0]["id"]
            assert "doc1" == res2["results"][0]["id"]

        # Test geo filter
        q1 = Query("foo").add_filter(GeoFilter("loc", -0.44, 51.45, 10)).no_content()
        q2 = Query("foo").add_filter(GeoFilter("loc", -0.44, 51.45, 100)).no_content()
        res1, res2 = await decoded_r.ft().search(q1), await decoded_r.ft().search(q2)

        if is_resp2_connection(decoded_r):
            assert 1 == res1.total
            assert 2 == res2.total
            assert "doc1" == res1.docs[0].id

            # Sort results, after RDB reload order may change
            res = [res2.docs[0].id, res2.docs[1].id]
            res.sort()
            assert ["doc1", "doc2"] == res
        else:
            assert 1 == res1["total_results"]
            assert 2 == res2["total_results"]
            assert "doc1" == res1["results"][0]["id"]

            # Sort results, after RDB reload order may change
            res = [res2["results"][0]["id"], res2["results"][1]["id"]]
            res.sort()
            assert ["doc1", "doc2"] == res

    @pytest.mark.redismod
    async def test_sort_by(self, decoded_r: redis.Redis):
        await decoded_r.ft().create_index(
            (TextField("txt"), NumericField("num", sortable=True))
        )
        await decoded_r.hset("doc1", mapping={"txt": "foo bar", "num": 1})
        await decoded_r.hset("doc2", mapping={"txt": "foo baz", "num": 2})
        await decoded_r.hset("doc3", mapping={"txt": "foo qux", "num": 3})

        # Test sort
        q1 = Query("foo").sort_by("num", asc=True).no_content()
        q2 = Query("foo").sort_by("num", asc=False).no_content()
        res1, res2 = await decoded_r.ft().search(q1), await decoded_r.ft().search(q2)

        if is_resp2_connection(decoded_r):
            assert 3 == res1.total
            assert "doc1" == res1.docs[0].id
            assert "doc2" == res1.docs[1].id
            assert "doc3" == res1.docs[2].id
            assert 3 == res2.total
            assert "doc1" == res2.docs[2].id
            assert "doc2" == res2.docs[1].id
            assert "doc3" == res2.docs[0].id
        else:
            assert 3 == res1["total_results"]
            assert "doc1" == res1["results"][0]["id"]
            assert "doc2" == res1["results"][1]["id"]
            assert "doc3" == res1["results"][2]["id"]
            assert 3 == res2["total_results"]
            assert "doc1" == res2["results"][2]["id"]
            assert "doc2" == res2["results"][1]["id"]
            assert "doc3" == res2["results"][0]["id"]

    @pytest.mark.redismod
    @skip_ifmodversion_lt("2.0.0", "search")
    async def test_drop_index(self, decoded_r: redis.Redis):
        """
        Ensure the index gets dropped by data remains by default
        """
        for x in range(20):
            for keep_docs in [[True, {}], [False, {"name": "haveit"}]]:
                idx = "HaveIt"
                index = self.getClient(decoded_r)
                await index.hset("index:haveit", mapping={"name": "haveit"})
                idef = IndexDefinition(prefix=["index:"])
                await index.ft(idx).create_index((TextField("name"),), definition=idef)
                await self.waitForIndex(index, idx)
                await index.ft(idx).dropindex(delete_documents=keep_docs[0])
                i = await index.hgetall("index:haveit")
                assert i == keep_docs[1]

    @pytest.mark.redismod
    async def test_example(self, decoded_r: redis.Redis):
        # Creating the index definition and schema
        await decoded_r.ft().create_index(
            (TextField("title", weight=5.0), TextField("body"))
        )

        # Indexing a document
        await decoded_r.hset(
            "doc1",
            mapping={
                "title": "RediSearch",
                "body": "Redisearch impements a search engine on top of redis",
            },
        )

        # Searching with complex parameters:
        q = Query("search engine").verbatim().no_content().paging(0, 5)

        res = await decoded_r.ft().search(q)
        assert res is not None

    @pytest.mark.redismod
    async def test_auto_complete(self, decoded_r: redis.Redis):
        n = 0
        with open(TITLES_CSV) as f:
            cr = csv.reader(f)

            for row in cr:
                n += 1
                term, score = row[0], float(row[1])
                assert n == await decoded_r.ft().sugadd(
                    "ac", Suggestion(term, score=score)
                )

        assert n == await decoded_r.ft().suglen("ac")
        ret = await decoded_r.ft().sugget("ac", "bad", with_scores=True)
        assert 2 == len(ret)
        assert "badger" == ret[0].string
        assert isinstance(ret[0].score, float)
        assert 1.0 != ret[0].score
        assert "badalte rishtey" == ret[1].string
        assert isinstance(ret[1].score, float)
        assert 1.0 != ret[1].score

        ret = await decoded_r.ft().sugget("ac", "bad", fuzzy=True, num=10)
        assert 10 == len(ret)
        assert 1.0 == ret[0].score
        strs = {x.string for x in ret}

        for sug in strs:
            assert 1 == await decoded_r.ft().sugdel("ac", sug)
        # make sure a second delete returns 0
        for sug in strs:
            assert 0 == await decoded_r.ft().sugdel("ac", sug)

        # make sure they were actually deleted
        ret2 = await decoded_r.ft().sugget("ac", "bad", fuzzy=True, num=10)
        for sug in ret2:
            assert sug.string not in strs

        # Test with payload
        await decoded_r.ft().sugadd("ac", Suggestion("pay1", payload="pl1"))
        await decoded_r.ft().sugadd("ac", Suggestion("pay2", payload="pl2"))
        await decoded_r.ft().sugadd("ac", Suggestion("pay3", payload="pl3"))

        sugs = await decoded_r.ft().sugget(
            "ac", "pay", with_payloads=True, with_scores=True
        )
        assert 3 == len(sugs)
        for sug in sugs:
            assert sug.payload
            assert sug.payload.startswith("pl")

    @pytest.mark.redismod
    async def test_no_index(self, decoded_r: redis.Redis):
        await decoded_r.ft().create_index(
            (
                TextField("field"),
                TextField("text", no_index=True, sortable=True),
                NumericField("numeric", no_index=True, sortable=True),
                GeoField("geo", no_index=True, sortable=True),
                TagField("tag", no_index=True, sortable=True),
            )
        )

        await decoded_r.hset(
            "doc1",
            mapping={
                "field": "aaa",
                "text": "1",
                "numeric": "1",
                "geo": "1,1",
                "tag": "1",
            },
        )
        await decoded_r.hset(
            "doc2",
            mapping={
                "field": "aab",
                "text": "2",
                "numeric": "2",
                "geo": "2,2",
                "tag": "2",
            },
        )
        await self.waitForIndex(decoded_r, "idx")

        if is_resp2_connection(decoded_r):
            res = await decoded_r.ft().search(Query("@text:aa*"))
            assert 0 == res.total

            res = await decoded_r.ft().search(Query("@field:aa*"))
            assert 2 == res.total

            res = await decoded_r.ft().search(Query("*").sort_by("text", asc=False))
            assert 2 == res.total
            assert "doc2" == res.docs[0].id

            res = await decoded_r.ft().search(Query("*").sort_by("text", asc=True))
            assert "doc1" == res.docs[0].id

            res = await decoded_r.ft().search(Query("*").sort_by("numeric", asc=True))
            assert "doc1" == res.docs[0].id

            res = await decoded_r.ft().search(Query("*").sort_by("geo", asc=True))
            assert "doc1" == res.docs[0].id

            res = await decoded_r.ft().search(Query("*").sort_by("tag", asc=True))
            assert "doc1" == res.docs[0].id
        else:
            res = await decoded_r.ft().search(Query("@text:aa*"))
            assert 0 == res["total_results"]

            res = await decoded_r.ft().search(Query("@field:aa*"))
            assert 2 == res["total_results"]

            res = await decoded_r.ft().search(Query("*").sort_by("text", asc=False))
            assert 2 == res["total_results"]
            assert "doc2" == res["results"][0]["id"]

            res = await decoded_r.ft().search(Query("*").sort_by("text", asc=True))
            assert "doc1" == res["results"][0]["id"]

            res = await decoded_r.ft().search(Query("*").sort_by("numeric", asc=True))
            assert "doc1" == res["results"][0]["id"]

            res = await decoded_r.ft().search(Query("*").sort_by("geo", asc=True))
            assert "doc1" == res["results"][0]["id"]

            res = await decoded_r.ft().search(Query("*").sort_by("tag", asc=True))
            assert "doc1" == res["results"][0]["id"]

        # Ensure exception is raised for non-indexable, non-sortable fields
        with pytest.raises(Exception):
            TextField("name", no_index=True, sortable=False)
        with pytest.raises(Exception):
            NumericField("name", no_index=True, sortable=False)
        with pytest.raises(Exception):
            GeoField("name", no_index=True, sortable=False)
        with pytest.raises(Exception):
            TagField("name", no_index=True, sortable=False)

    @pytest.mark.redismod
    @skip_if_server_version_lt("7.4.0")
    @skip_ifmodversion_lt("2.10.0", "search")
    async def test_create_index_empty_or_missing_fields_with_sortable(
        self,
        decoded_r: redis.Redis,
    ):
        definition = IndexDefinition(prefix=["property:"], index_type=IndexType.HASH)

        fields = [
            TextField("title", sortable=True, index_empty=True),
            TagField("features", index_missing=True, sortable=True),
            TextField("description", no_index=True, sortable=True),
        ]

        await decoded_r.ft().create_index(fields, definition=definition)

    @pytest.mark.redismod
    async def test_explain(self, decoded_r: redis.Redis):
        await decoded_r.ft().create_index(
            (TextField("f1"), TextField("f2"), TextField("f3"))
        )
        res = await decoded_r.ft().explain("@f3:f3_val @f2:f2_val @f1:f1_val")
        assert res

    @pytest.mark.redismod
    async def test_explaincli(self, decoded_r: redis.Redis):
        with pytest.raises(NotImplementedError):
            await decoded_r.ft().explain_cli("foo")

    @pytest.mark.redismod
    async def test_summarize(self, decoded_r: redis.Redis):
        await self.createIndex(decoded_r.ft())
        await self.waitForIndex(decoded_r, "idx")

        q = Query('"king henry"').paging(0, 1)
        q.highlight(fields=("play", "txt"), tags=("<b>", "</b>"))
        q.summarize("txt")

        if is_resp2_connection(decoded_r):
            doc = sorted((await decoded_r.ft().search(q)).docs)[0]
            assert "<b>Henry</b> IV" == doc.play
            assert (
                "ACT I SCENE I. London. The palace. Enter <b>KING</b> <b>HENRY</b>, LORD JOHN OF LANCASTER, the EARL of WESTMORELAND, SIR... "  # noqa
                == doc.txt
            )

            q = Query('"king henry"').paging(0, 1).summarize().highlight()

            doc = sorted((await decoded_r.ft().search(q)).docs)[0]
            assert "<b>Henry</b> ... " == doc.play
            assert (
                "ACT I SCENE I. London. The palace. Enter <b>KING</b> <b>HENRY</b>, LORD JOHN OF LANCASTER, the EARL of WESTMORELAND, SIR... "  # noqa
                == doc.txt
            )
        else:
            doc = sorted((await decoded_r.ft().search(q))["results"])[0]
            assert "<b>Henry</b> IV" == doc["extra_attributes"]["play"]
            assert (
                "ACT I SCENE I. London. The palace. Enter <b>KING</b> <b>HENRY</b>, LORD JOHN OF LANCASTER, the EARL of WESTMORELAND, SIR... "  # noqa
                == doc["extra_attributes"]["txt"]
            )

            q = Query('"king henry"').paging(0, 1).summarize().highlight()

            doc = sorted((await decoded_r.ft().search(q))["results"])[0]
            assert "<b>Henry</b> ... " == doc["extra_attributes"]["play"]
            assert (
                "ACT I SCENE I. London. The palace. Enter <b>KING</b> <b>HENRY</b>, LORD JOHN OF LANCASTER, the EARL of WESTMORELAND, SIR... "  # noqa
                == doc["extra_attributes"]["txt"]
            )

    @pytest.mark.redismod
    @skip_ifmodversion_lt("2.0.0", "search")
    async def test_alias(self, decoded_r: redis.Redis):
        index1 = self.getClient(decoded_r)
        index2 = self.getClient(decoded_r)

        def1 = IndexDefinition(prefix=["index1:"])
        def2 = IndexDefinition(prefix=["index2:"])

        ftindex1 = index1.ft("testAlias")
        ftindex2 = index2.ft("testAlias2")
        await ftindex1.create_index((TextField("name"),), definition=def1)
        await ftindex2.create_index((TextField("name"),), definition=def2)

        await index1.hset("index1:lonestar", mapping={"name": "lonestar"})
        await index2.hset("index2:yogurt", mapping={"name": "yogurt"})

        if is_resp2_connection(decoded_r):
            res = (await ftindex1.search("*")).docs[0]
            assert "index1:lonestar" == res.id

            # create alias and check for results
            await ftindex1.aliasadd("spaceballs")
            alias_client = self.getClient(decoded_r).ft("spaceballs")
            res = (await alias_client.search("*")).docs[0]
            assert "index1:lonestar" == res.id

            # Throw an exception when trying to add an alias that already exists
            with pytest.raises(Exception):
                await ftindex2.aliasadd("spaceballs")

            # update alias and ensure new results
            await ftindex2.aliasupdate("spaceballs")
            alias_client2 = self.getClient(decoded_r).ft("spaceballs")

            res = (await alias_client2.search("*")).docs[0]
            assert "index2:yogurt" == res.id
        else:
            res = (await ftindex1.search("*"))["results"][0]
            assert "index1:lonestar" == res["id"]

            # create alias and check for results
            await ftindex1.aliasadd("spaceballs")
            alias_client = self.getClient(await decoded_r).ft("spaceballs")
            res = (await alias_client.search("*"))["results"][0]
            assert "index1:lonestar" == res["id"]

            # Throw an exception when trying to add an alias that already exists
            with pytest.raises(Exception):
                await ftindex2.aliasadd("spaceballs")

            # update alias and ensure new results
            await ftindex2.aliasupdate("spaceballs")
            alias_client2 = self.getClient(await decoded_r).ft("spaceballs")

            res = (await alias_client2.search("*"))["results"][0]
            assert "index2:yogurt" == res["id"]

        await ftindex2.aliasdel("spaceballs")
        with pytest.raises(Exception):
            (await alias_client2.search("*")).docs[0]

    @pytest.mark.redismod
    @pytest.mark.xfail(strict=False)
    async def test_alias_basic(self, decoded_r: redis.Redis):
        # Creating a client with one index
        client = self.getClient(decoded_r)
        await client.flushdb()
        index1 = self.getClient(decoded_r).ft("testAlias")

        await index1.create_index((TextField("txt"),))
        await index1.client.hset("doc1", mapping={"txt": "text goes here"})

        index2 = self.getClient(decoded_r).ft("testAlias2")
        await index2.create_index((TextField("txt"),))
        await index2.client.hset("doc2", mapping={"txt": "text goes here"})

        # add the actual alias and check
        await index1.aliasadd("myalias")
        alias_client = self.getClient(decoded_r).ft("myalias")
        if is_resp2_connection(decoded_r):
            res = sorted((await alias_client.search("*")).docs, key=lambda x: x.id)
            assert "doc1" == res[0].id

            # Throw an exception when trying to add an alias that already exists
            with pytest.raises(Exception):
                await index2.aliasadd("myalias")

            # update the alias and ensure we get doc2
            await index2.aliasupdate("myalias")
            alias_client2 = self.getClient(decoded_r).ft("myalias")
            res = sorted((await alias_client2.search("*")).docs, key=lambda x: x.id)
            assert "doc1" == res[0].id
        else:
            res = sorted(
                (await alias_client.search("*"))["results"], key=lambda x: x["id"]
            )
            assert "doc1" == res[0]["id"]

            # Throw an exception when trying to add an alias that already exists
            with pytest.raises(Exception):
                await index2.aliasadd("myalias")

            # update the alias and ensure we get doc2
            await index2.aliasupdate("myalias")
            alias_client2 = self.getClient(client).ft("myalias")
            res = sorted(
                (await alias_client2.search("*"))["results"], key=lambda x: x["id"]
            )
            assert "doc1" == res[0]["id"]

        # delete the alias and expect an error if we try to query again
        await index2.aliasdel("myalias")
        with pytest.raises(Exception):
            _ = (await alias_client2.search("*")).docs[0]

    @pytest.mark.redismod
    async def test_tags(self, decoded_r: redis.Redis):
        await decoded_r.ft().create_index((TextField("txt"), TagField("tags")))
        tags = "foo,foo bar,hello;world"
        tags2 = "soba,ramen"

        await decoded_r.hset("doc1", mapping={"txt": "fooz barz", "tags": tags})
        await decoded_r.hset("doc2", mapping={"txt": "noodles", "tags": tags2})
        await self.waitForIndex(decoded_r, "idx")

        q = Query("@tags:{foo}")
        if is_resp2_connection(decoded_r):
            res = await decoded_r.ft().search(q)
            assert 1 == res.total

            q = Query("@tags:{foo bar}")
            res = await decoded_r.ft().search(q)
            assert 1 == res.total

            q = Query("@tags:{foo\\ bar}")
            res = await decoded_r.ft().search(q)
            assert 1 == res.total

            q = Query("@tags:{hello\\;world}")
            res = await decoded_r.ft().search(q)
            assert 1 == res.total

            q2 = await decoded_r.ft().tagvals("tags")
            assert (tags.split(",") + tags2.split(",")).sort() == q2.sort()
        else:
            res = await decoded_r.ft().search(q)
            assert 1 == res["total_results"]

            q = Query("@tags:{foo bar}")
            res = await decoded_r.ft().search(q)
            assert 1 == res["total_results"]

            q = Query("@tags:{foo\\ bar}")
            res = await decoded_r.ft().search(q)
            assert 1 == res["total_results"]

            q = Query("@tags:{hello\\;world}")
            res = await decoded_r.ft().search(q)
            assert 1 == res["total_results"]

            q2 = await decoded_r.ft().tagvals("tags")
            assert set(tags.split(",") + tags2.split(",")) == set(q2)

    @pytest.mark.redismod
    async def test_textfield_sortable_nostem(self, decoded_r: redis.Redis):
        # Creating the index definition with sortable and no_stem
        await decoded_r.ft().create_index(
            (TextField("txt", sortable=True, no_stem=True),)
        )

        # Now get the index info to confirm its contents
        response = await decoded_r.ft().info()
        if is_resp2_connection(decoded_r):
            assert "SORTABLE" in response["attributes"][0]
            assert "NOSTEM" in response["attributes"][0]
        else:
            assert "SORTABLE" in response["attributes"][0]["flags"]
            assert "NOSTEM" in response["attributes"][0]["flags"]

    @pytest.mark.redismod
    async def test_alter_schema_add(self, decoded_r: redis.Redis):
        # Creating the index definition and schema
        await decoded_r.ft().create_index(TextField("title"))

        # Using alter to add a field
        await decoded_r.ft().alter_schema_add(TextField("body"))

        # Indexing a document
        await decoded_r.hset(
            "doc1",
            mapping={"title": "MyTitle", "body": "Some content only in the body"},
        )

        # Searching with parameter only in the body (the added field)
        q = Query("only in the body")

        # Ensure we find the result searching on the added body field
        res = await decoded_r.ft().search(q)
        if is_resp2_connection(decoded_r):
            assert 1 == res.total
        else:
            assert 1 == res["total_results"]

    @pytest.mark.redismod
    async def test_spell_check(self, decoded_r: redis.Redis):
        await decoded_r.ft().create_index((TextField("f1"), TextField("f2")))

        await decoded_r.hset(
            "doc1", mapping={"f1": "some valid content", "f2": "this is sample text"}
        )
        await decoded_r.hset(
            "doc2", mapping={"f1": "very important", "f2": "lorem ipsum"}
        )
        await self.waitForIndex(decoded_r, "idx")

        if is_resp2_connection(decoded_r):
            # test spellcheck
            res = await decoded_r.ft().spellcheck("impornant")
            assert "important" == res["impornant"][0]["suggestion"]

            res = await decoded_r.ft().spellcheck("contnt")
            assert "content" == res["contnt"][0]["suggestion"]

            # test spellcheck with Levenshtein distance
            res = await decoded_r.ft().spellcheck("vlis")
            assert res == {}
            res = await decoded_r.ft().spellcheck("vlis", distance=2)
            assert "valid" == res["vlis"][0]["suggestion"]

            # test spellcheck include
            await decoded_r.ft().dict_add("dict", "lore", "lorem", "lorm")
            res = await decoded_r.ft().spellcheck("lorm", include="dict")
            assert len(res["lorm"]) == 3
            assert (
                res["lorm"][0]["suggestion"],
                res["lorm"][1]["suggestion"],
                res["lorm"][2]["suggestion"],
            ) == ("lorem", "lore", "lorm")
            assert (res["lorm"][0]["score"], res["lorm"][1]["score"]) == ("0.5", "0")

            # test spellcheck exclude
            res = await decoded_r.ft().spellcheck("lorm", exclude="dict")
            assert res == {}
        else:
            # test spellcheck
            res = await decoded_r.ft().spellcheck("impornant")
            assert "important" in res["results"]["impornant"][0].keys()

            res = await decoded_r.ft().spellcheck("contnt")
            assert "content" in res["results"]["contnt"][0].keys()

            # test spellcheck with Levenshtein distance
            res = await decoded_r.ft().spellcheck("vlis")
            assert res == {"results": {"vlis": []}}
            res = await decoded_r.ft().spellcheck("vlis", distance=2)
            assert "valid" in res["results"]["vlis"][0].keys()

            # test spellcheck include
            await decoded_r.ft().dict_add("dict", "lore", "lorem", "lorm")
            res = await decoded_r.ft().spellcheck("lorm", include="dict")
            assert len(res["results"]["lorm"]) == 3
            assert "lorem" in res["results"]["lorm"][0].keys()
            assert "lore" in res["results"]["lorm"][1].keys()
            assert "lorm" in res["results"]["lorm"][2].keys()
            assert (
                res["results"]["lorm"][0]["lorem"],
                res["results"]["lorm"][1]["lore"],
            ) == (0.5, 0)

            # test spellcheck exclude
            res = await decoded_r.ft().spellcheck("lorm", exclude="dict")
            assert res == {"results": {}}

    @pytest.mark.redismod
    async def test_dict_operations(self, decoded_r: redis.Redis):
        await decoded_r.ft().create_index((TextField("f1"), TextField("f2")))
        # Add three items
        res = await decoded_r.ft().dict_add("custom_dict", "item1", "item2", "item3")
        assert 3 == res

        # Remove one item
        res = await decoded_r.ft().dict_del("custom_dict", "item2")
        assert 1 == res

        # Dump dict and inspect content
        res = await decoded_r.ft().dict_dump("custom_dict")
        assert res == ["item1", "item3"]

        # Remove rest of the items before reload
        await decoded_r.ft().dict_del("custom_dict", *res)

    @pytest.mark.redismod
    async def test_phonetic_matcher(self, decoded_r: redis.Redis):
        await decoded_r.ft().create_index((TextField("name"),))
        await decoded_r.hset("doc1", mapping={"name": "Jon"})
        await decoded_r.hset("doc2", mapping={"name": "John"})

        res = await decoded_r.ft().search(Query("Jon"))
        if is_resp2_connection(decoded_r):
            assert 1 == len(res.docs)
            assert "Jon" == res.docs[0].name
        else:
            assert 1 == res["total_results"]
            assert "Jon" == res["results"][0]["extra_attributes"]["name"]

        # Drop and create index with phonetic matcher
        await decoded_r.flushdb()

        await decoded_r.ft().create_index(
            (TextField("name", phonetic_matcher="dm:en"),)
        )
        await decoded_r.hset("doc1", mapping={"name": "Jon"})
        await decoded_r.hset("doc2", mapping={"name": "John"})

        res = await decoded_r.ft().search(Query("Jon"))
        if is_resp2_connection(decoded_r):
            assert 2 == len(res.docs)
            assert ["John", "Jon"] == sorted(d.name for d in res.docs)
        else:
            assert 2 == res["total_results"]
            assert ["John", "Jon"] == sorted(
                d["extra_attributes"]["name"] for d in res["results"]
            )

    @pytest.mark.redismod
    async def test_get(self, decoded_r: redis.Redis):
        await decoded_r.ft().create_index((TextField("f1"), TextField("f2")))

        assert [None] == await decoded_r.ft().get("doc1")
        assert [None, None] == await decoded_r.ft().get("doc2", "doc1")

        await decoded_r.hset(
            "doc1",
            mapping={"f1": "some valid content dd1", "f2": "this is sample text f1"},
        )
        await decoded_r.hset(
            "doc2",
            mapping={"f1": "some valid content dd2", "f2": "this is sample text f2"},
        )

        assert [
            ["f1", "some valid content dd2", "f2", "this is sample text f2"]
        ] == await decoded_r.ft().get("doc2")
        assert [
            ["f1", "some valid content dd1", "f2", "this is sample text f1"],
            ["f1", "some valid content dd2", "f2", "this is sample text f2"],
        ] == await decoded_r.ft().get("doc1", "doc2")

    @pytest.mark.redismod
    async def test_query_timeout(self, decoded_r: redis.Redis):
        q1 = Query("foo").timeout(5000)
        assert q1.get_args() == ["foo", "TIMEOUT", 5000, "DIALECT", 2, "LIMIT", 0, 10]
        q2 = Query("foo").timeout("not_a_number")
        with pytest.raises(redis.ResponseError):
            await decoded_r.ft().search(q2)

    @pytest.mark.redismod
    @skip_if_resp_version(3)
    async def test_binary_and_text_fields(self, decoded_r: redis.Redis):
        fake_vec = np.array([0.1, 0.2, 0.3, 0.4], dtype=np.float32)

        index_name = "mixed_index"
        mixed_data = {"first_name": "🐍python", "vector_emb": fake_vec.tobytes()}
        await decoded_r.hset(f"{index_name}:1", mapping=mixed_data)

        schema = [
            TagField("first_name"),
            VectorField(
                "embeddings_bio",
                algorithm="HNSW",
                attributes={
                    "TYPE": "FLOAT32",
                    "DIM": 4,
                    "DISTANCE_METRIC": "COSINE",
                },
            ),
        ]

        await decoded_r.ft(index_name).create_index(
            fields=schema,
            definition=IndexDefinition(
                prefix=[f"{index_name}:"], index_type=IndexType.HASH
            ),
        )
        await self.waitForIndex(decoded_r, index_name)

        query = (
            Query("*")
            .return_field("vector_emb", decode_field=False)
            .return_field("first_name")
        )
        result = await decoded_r.ft(index_name).search(query=query, query_params={})
        docs = result.docs

        if len(docs) == 0:
            hash_content = await decoded_r.hget(f"{index_name}:1", "first_name")
        assert len(docs) > 0, (
            f"Returned search results are empty. Result: {result}; Hash: {hash_content}"
        )

        decoded_vec_from_search_results = np.frombuffer(
            docs[0]["vector_emb"], dtype=np.float32
        )

        assert np.array_equal(decoded_vec_from_search_results, fake_vec), (
            "The vectors are not equal"
        )

        assert docs[0]["first_name"] == mixed_data["first_name"], (
            "The text field is not decoded correctly"
        )


class TestScorers(AsyncSearchTestsBase):
    @pytest.mark.redismod
    @pytest.mark.onlynoncluster
    # NOTE(imalinovskyi): This test contains hardcoded scores valid only for RediSearch 2.8+
    @skip_ifmodversion_lt("2.8.0", "search")
    @skip_if_server_version_gte("7.9.0")
    async def test_scorer(self, decoded_r: redis.Redis):
        await decoded_r.ft().create_index((TextField("description"),))

        await decoded_r.hset(
            "doc1",
            mapping={"description": "The quick brown fox jumps over the lazy dog"},
        )
        await decoded_r.hset(
            "doc2",
            mapping={
                "description": "Quick alice was beginning to get very tired of sitting by her quick sister on the bank, and of having nothing to do."  # noqa
            },
        )

        if is_resp2_connection(decoded_r):
            # default scorer is TFIDF
            res = await decoded_r.ft().search(Query("quick").with_scores())
            assert 1.0 == res.docs[0].score
            res = await decoded_r.ft().search(
                Query("quick").scorer("TFIDF").with_scores()
            )
            assert 1.0 == res.docs[0].score
            res = await decoded_r.ft().search(
                Query("quick").scorer("TFIDF.DOCNORM").with_scores()
            )
            assert 0.14285714285714285 == res.docs[0].score
            res = await decoded_r.ft().search(
                Query("quick").scorer("BM25").with_scores()
            )
            assert 0.22471909420069797 == res.docs[0].score
            res = await decoded_r.ft().search(
                Query("quick").scorer("DISMAX").with_scores()
            )
            assert 2.0 == res.docs[0].score
            res = await decoded_r.ft().search(
                Query("quick").scorer("DOCSCORE").with_scores()
            )
            assert 1.0 == res.docs[0].score
            res = await decoded_r.ft().search(
                Query("quick").scorer("HAMMING").with_scores()
            )
            assert 0.0 == res.docs[0].score
        else:
            res = await decoded_r.ft().search(Query("quick").with_scores())
            assert 1.0 == res["results"][0]["score"]
            res = await decoded_r.ft().search(
                Query("quick").scorer("TFIDF").with_scores()
            )
            assert 1.0 == res["results"][0]["score"]
            res = await decoded_r.ft().search(
                Query("quick").scorer("TFIDF.DOCNORM").with_scores()
            )
            assert 0.14285714285714285 == res["results"][0]["score"]
            res = await decoded_r.ft().search(
                Query("quick").scorer("BM25").with_scores()
            )
            assert 0.22471909420069797 == res["results"][0]["score"]
            res = await decoded_r.ft().search(
                Query("quick").scorer("DISMAX").with_scores()
            )
            assert 2.0 == res["results"][0]["score"]
            res = await decoded_r.ft().search(
                Query("quick").scorer("DOCSCORE").with_scores()
            )
            assert 1.0 == res["results"][0]["score"]
            res = await decoded_r.ft().search(
                Query("quick").scorer("HAMMING").with_scores()
            )
            assert 0.0 == res["results"][0]["score"]

    @pytest.mark.redismod
    @pytest.mark.onlynoncluster
    # NOTE(imalinovskyi): This test contains hardcoded scores valid only for RediSearch 2.8+
    @skip_ifmodversion_lt("2.8.0", "search")
    @skip_if_server_version_lt("7.9.0")
    async def test_scorer_with_new_default_scorer(self, decoded_r: redis.Redis):
        await decoded_r.ft().create_index((TextField("description"),))

        await decoded_r.hset(
            "doc1",
            mapping={"description": "The quick brown fox jumps over the lazy dog"},
        )
        await decoded_r.hset(
            "doc2",
            mapping={
                "description": "Quick alice was beginning to get very tired of sitting by her quick sister on the bank, and of having nothing to do."  # noqa
            },
        )

        if is_resp2_connection(decoded_r):
            # default scorer is BM25STD
            res = await decoded_r.ft().search(Query("quick").with_scores())
            assert 0.23 == pytest.approx(res.docs[0].score, 0.05)
            res = await decoded_r.ft().search(
                Query("quick").scorer("TFIDF").with_scores()
            )
            assert 1.0 == res.docs[0].score
            res = await decoded_r.ft().search(
                Query("quick").scorer("TFIDF.DOCNORM").with_scores()
            )
            assert 0.14285714285714285 == res.docs[0].score
            res = await decoded_r.ft().search(
                Query("quick").scorer("BM25").with_scores()
            )
            assert 0.22471909420069797 == res.docs[0].score
            res = await decoded_r.ft().search(
                Query("quick").scorer("DISMAX").with_scores()
            )
            assert 2.0 == res.docs[0].score
            res = await decoded_r.ft().search(
                Query("quick").scorer("DOCSCORE").with_scores()
            )
            assert 1.0 == res.docs[0].score
            res = await decoded_r.ft().search(
                Query("quick").scorer("HAMMING").with_scores()
            )
            assert 0.0 == res.docs[0].score
        else:
            res = await decoded_r.ft().search(Query("quick").with_scores())
            assert 0.23 == pytest.approx(res["results"][0]["score"], 0.05)
            res = await decoded_r.ft().search(
                Query("quick").scorer("TFIDF").with_scores()
            )
            assert 1.0 == res["results"][0]["score"]
            res = await decoded_r.ft().search(
                Query("quick").scorer("TFIDF.DOCNORM").with_scores()
            )
            assert 0.14285714285714285 == res["results"][0]["score"]
            res = await decoded_r.ft().search(
                Query("quick").scorer("BM25").with_scores()
            )
            assert 0.22471909420069797 == res["results"][0]["score"]
            res = await decoded_r.ft().search(
                Query("quick").scorer("DISMAX").with_scores()
            )
            assert 2.0 == res["results"][0]["score"]
            res = await decoded_r.ft().search(
                Query("quick").scorer("DOCSCORE").with_scores()
            )
            assert 1.0 == res["results"][0]["score"]
            res = await decoded_r.ft().search(
                Query("quick").scorer("HAMMING").with_scores()
            )
            assert 0.0 == res["results"][0]["score"]


class TestConfig(AsyncSearchTestsBase):
    @pytest.mark.redismod
    @pytest.mark.onlynoncluster
    @skip_ifmodversion_lt("2.2.0", "search")
    @skip_if_server_version_gte("7.9.0")
    async def test_config(self, decoded_r: redis.Redis):
        assert await decoded_r.ft().config_set("TIMEOUT", "100")
        with pytest.raises(redis.ResponseError):
            await decoded_r.ft().config_set("TIMEOUT", "null")
        res = await decoded_r.ft().config_get("*")
        assert "100" == res["TIMEOUT"]
        res = await decoded_r.ft().config_get("TIMEOUT")
        assert "100" == res["TIMEOUT"]

    @pytest.mark.redismod
    @pytest.mark.onlynoncluster
    @skip_if_server_version_lt("7.9.0")
    async def test_config_with_removed_ftconfig(self, decoded_r: redis.Redis):
        assert await decoded_r.config_set("timeout", "100")
        with pytest.raises(redis.ResponseError):
            await decoded_r.config_set("timeout", "null")
        res = await decoded_r.config_get("*")
        assert "100" == res["timeout"]
        res = await decoded_r.config_get("timeout")
        assert "100" == res["timeout"]


class TestAggregations(AsyncSearchTestsBase):
    @pytest.mark.redismod
    @pytest.mark.onlynoncluster
    async def test_aggregations_groupby(self, decoded_r: redis.Redis):
        # Creating the index definition and schema
        await decoded_r.ft().create_index(
            (
                NumericField("random_num"),
                TextField("title"),
                TextField("body"),
                TextField("parent"),
            )
        )

        # Indexing a document
        await decoded_r.hset(
            "search",
            mapping={
                "title": "RediSearch",
                "body": "Redisearch impements a search engine on top of redis",
                "parent": "redis",
                "random_num": 10,
            },
        )
        await decoded_r.hset(
            "ai",
            mapping={
                "title": "RedisAI",
                "body": "RedisAI executes Deep Learning/Machine Learning models and managing their data.",  # noqa
                "parent": "redis",
                "random_num": 3,
            },
        )
        await decoded_r.hset(
            "json",
            mapping={
                "title": "RedisJson",
                "body": "RedisJSON implements ECMA-404 The JSON Data Interchange Standard as a native data type.",  # noqa
                "parent": "redis",
                "random_num": 8,
            },
        )

        for dialect in [1, 2]:
            if is_resp2_connection(decoded_r):
                req = (
                    aggregations.AggregateRequest("redis")
                    .group_by("@parent", reducers.count())
                    .dialect(dialect)
                )

                res = (await decoded_r.ft().aggregate(req)).rows[0]
                assert res[1] == "redis"
                assert res[3] == "3"

                req = (
                    aggregations.AggregateRequest("redis")
                    .group_by("@parent", reducers.count_distinct("@title"))
                    .dialect(dialect)
                )

                res = (await decoded_r.ft().aggregate(req)).rows[0]
                assert res[1] == "redis"
                assert res[3] == "3"

                req = (
                    aggregations.AggregateRequest("redis")
                    .group_by("@parent", reducers.count_distinctish("@title"))
                    .dialect(dialect)
                )

                res = (await decoded_r.ft().aggregate(req)).rows[0]
                assert res[1] == "redis"
                assert res[3] == "3"

                req = (
                    aggregations.AggregateRequest("redis")
                    .group_by("@parent", reducers.sum("@random_num"))
                    .dialect(dialect)
                )

                res = (await decoded_r.ft().aggregate(req)).rows[0]
                assert res[1] == "redis"
                assert res[3] == "21"  # 10+8+3

                req = (
                    aggregations.AggregateRequest("redis")
                    .group_by("@parent", reducers.min("@random_num"))
                    .dialect(dialect)
                )

                res = (await decoded_r.ft().aggregate(req)).rows[0]
                assert res[1] == "redis"
                assert res[3] == "3"  # min(10,8,3)

                req = (
                    aggregations.AggregateRequest("redis")
                    .group_by("@parent", reducers.max("@random_num"))
                    .dialect(dialect)
                )

                res = (await decoded_r.ft().aggregate(req)).rows[0]
                assert res[1] == "redis"
                assert res[3] == "10"  # max(10,8,3)

                req = (
                    aggregations.AggregateRequest("redis")
                    .group_by("@parent", reducers.avg("@random_num"))
                    .dialect(dialect)
                )

                res = (await decoded_r.ft().aggregate(req)).rows[0]
                assert res[1] == "redis"
                assert res[3] == "7"  # (10+3+8)/3

                req = (
                    aggregations.AggregateRequest("redis")
                    .group_by("@parent", reducers.stddev("random_num"))
                    .dialect(dialect)
                )

                res = (await decoded_r.ft().aggregate(req)).rows[0]
                assert res[1] == "redis"
                assert res[3] == "3.60555127546"

                req = (
                    aggregations.AggregateRequest("redis")
                    .group_by("@parent", reducers.quantile("@random_num", 0.5))
                    .dialect(dialect)
                )

                res = (await decoded_r.ft().aggregate(req)).rows[0]
                assert res[1] == "redis"
                assert res[3] == "8"  # median of 3,8,10

                req = (
                    aggregations.AggregateRequest("redis")
                    .group_by("@parent", reducers.tolist("@title"))
                    .dialect(dialect)
                )

                res = (await decoded_r.ft().aggregate(req)).rows[0]
                assert res[1] == "redis"
                assert set(res[3]) == {"RediSearch", "RedisAI", "RedisJson"}

                req = (
                    aggregations.AggregateRequest("redis")
                    .group_by("@parent", reducers.first_value("@title").alias("first"))
                    .dialect(dialect)
                )

                res = (await decoded_r.ft().aggregate(req)).rows[0]
                assert res == ["parent", "redis", "first", "RediSearch"]

                req = (
                    aggregations.AggregateRequest("redis")
                    .group_by(
                        "@parent", reducers.random_sample("@title", 2).alias("random")
                    )
                    .dialect(dialect)
                )

                res = (await decoded_r.ft().aggregate(req)).rows[0]
                assert res[1] == "redis"
                assert res[2] == "random"
                assert len(res[3]) == 2
                assert res[3][0] in ["RediSearch", "RedisAI", "RedisJson"]
            else:
                req = (
                    aggregations.AggregateRequest("redis")
                    .group_by("@parent", reducers.count())
                    .dialect(dialect)
                )

                res = (await decoded_r.ft().aggregate(req))["results"][0]
                assert res["extra_attributes"]["parent"] == "redis"
                assert res["extra_attributes"]["__generated_aliascount"] == "3"

                req = (
                    aggregations.AggregateRequest("redis")
                    .group_by("@parent", reducers.count_distinct("@title"))
                    .dialect(dialect)
                )

                res = (await decoded_r.ft().aggregate(req))["results"][0]
                assert res["extra_attributes"]["parent"] == "redis"
                assert (
                    res["extra_attributes"]["__generated_aliascount_distincttitle"]
                    == "3"
                )

                req = (
                    aggregations.AggregateRequest("redis")
                    .group_by("@parent", reducers.count_distinctish("@title"))
                    .dialect(dialect)
                )

                res = (await decoded_r.ft().aggregate(req))["results"][0]
                assert res["extra_attributes"]["parent"] == "redis"
                assert (
                    res["extra_attributes"]["__generated_aliascount_distinctishtitle"]
                    == "3"
                )

                req = (
                    aggregations.AggregateRequest("redis")
                    .group_by("@parent", reducers.sum("@random_num"))
                    .dialect(dialect)
                )

                res = (await decoded_r.ft().aggregate(req))["results"][0]
                assert res["extra_attributes"]["parent"] == "redis"
                assert res["extra_attributes"]["__generated_aliassumrandom_num"] == "21"

                req = (
                    aggregations.AggregateRequest("redis")
                    .group_by("@parent", reducers.min("@random_num"))
                    .dialect(dialect)
                )

                res = (await decoded_r.ft().aggregate(req))["results"][0]
                assert res["extra_attributes"]["parent"] == "redis"
                assert res["extra_attributes"]["__generated_aliasminrandom_num"] == "3"

                req = (
                    aggregations.AggregateRequest("redis")
                    .group_by("@parent", reducers.max("@random_num"))
                    .dialect(dialect)
                )

                res = (await decoded_r.ft().aggregate(req))["results"][0]
                assert res["extra_attributes"]["parent"] == "redis"
                assert res["extra_attributes"]["__generated_aliasmaxrandom_num"] == "10"

                req = (
                    aggregations.AggregateRequest("redis")
                    .group_by("@parent", reducers.avg("@random_num"))
                    .dialect(dialect)
                )

                res = (await decoded_r.ft().aggregate(req))["results"][0]
                assert res["extra_attributes"]["parent"] == "redis"
                assert res["extra_attributes"]["__generated_aliasavgrandom_num"] == "7"

                req = (
                    aggregations.AggregateRequest("redis")
                    .group_by("@parent", reducers.stddev("random_num"))
                    .dialect(dialect)
                )

                res = (await decoded_r.ft().aggregate(req))["results"][0]
                assert res["extra_attributes"]["parent"] == "redis"
                assert (
                    res["extra_attributes"]["__generated_aliasstddevrandom_num"]
                    == "3.60555127546"
                )

                req = (
                    aggregations.AggregateRequest("redis")
                    .group_by("@parent", reducers.quantile("@random_num", 0.5))
                    .dialect(dialect)
                )

                res = (await decoded_r.ft().aggregate(req))["results"][0]
                assert res["extra_attributes"]["parent"] == "redis"
                assert (
                    res["extra_attributes"]["__generated_aliasquantilerandom_num,0.5"]
                    == "8"
                )

                req = (
                    aggregations.AggregateRequest("redis")
                    .group_by("@parent", reducers.tolist("@title"))
                    .dialect(dialect)
                )

                res = (await decoded_r.ft().aggregate(req))["results"][0]
                assert res["extra_attributes"]["parent"] == "redis"
                assert set(res["extra_attributes"]["__generated_aliastolisttitle"]) == {
                    "RediSearch",
                    "RedisAI",
                    "RedisJson",
                }

                req = (
                    aggregations.AggregateRequest("redis")
                    .group_by("@parent", reducers.first_value("@title").alias("first"))
                    .dialect(dialect)
                )

                res = (await decoded_r.ft().aggregate(req))["results"][0]
                assert res["extra_attributes"] == {
                    "parent": "redis",
                    "first": "RediSearch",
                }

                req = (
                    aggregations.AggregateRequest("redis")
                    .group_by(
                        "@parent", reducers.random_sample("@title", 2).alias("random")
                    )
                    .dialect(dialect)
                )

                res = (await decoded_r.ft().aggregate(req))["results"][0]
                assert res["extra_attributes"]["parent"] == "redis"
                assert "random" in res["extra_attributes"].keys()
                assert len(res["extra_attributes"]["random"]) == 2
                assert res["extra_attributes"]["random"][0] in [
                    "RediSearch",
                    "RedisAI",
                    "RedisJson",
                ]

    @pytest.mark.redismod
    async def test_aggregations_sort_by_and_limit(self, decoded_r: redis.Redis):
        await decoded_r.ft().create_index((TextField("t1"), TextField("t2")))

        await decoded_r.ft().client.hset("doc1", mapping={"t1": "a", "t2": "b"})
        await decoded_r.ft().client.hset("doc2", mapping={"t1": "b", "t2": "a"})

        if is_resp2_connection(decoded_r):
            # test sort_by using SortDirection
            req = aggregations.AggregateRequest("*").sort_by(
                aggregations.Asc("@t2"), aggregations.Desc("@t1")
            )
            res = await decoded_r.ft().aggregate(req)
            assert res.rows[0] == ["t2", "a", "t1", "b"]
            assert res.rows[1] == ["t2", "b", "t1", "a"]

            # test sort_by without SortDirection
            req = aggregations.AggregateRequest("*").sort_by("@t1")
            res = await decoded_r.ft().aggregate(req)
            assert res.rows[0] == ["t1", "a"]
            assert res.rows[1] == ["t1", "b"]

            # test sort_by with max
            req = aggregations.AggregateRequest("*").sort_by("@t1", max=1)
            res = await decoded_r.ft().aggregate(req)
            assert len(res.rows) == 1

            # test limit
            req = aggregations.AggregateRequest("*").sort_by("@t1").limit(1, 1)
            res = await decoded_r.ft().aggregate(req)
            assert len(res.rows) == 1
            assert res.rows[0] == ["t1", "b"]
        else:
            # test sort_by using SortDirection
            req = aggregations.AggregateRequest("*").sort_by(
                aggregations.Asc("@t2"), aggregations.Desc("@t1")
            )
            res = (await decoded_r.ft().aggregate(req))["results"]
            assert res[0]["extra_attributes"] == {"t2": "a", "t1": "b"}
            assert res[1]["extra_attributes"] == {"t2": "b", "t1": "a"}

            # test sort_by without SortDirection
            req = aggregations.AggregateRequest("*").sort_by("@t1")
            res = (await decoded_r.ft().aggregate(req))["results"]
            assert res[0]["extra_attributes"] == {"t1": "a"}
            assert res[1]["extra_attributes"] == {"t1": "b"}

            # test sort_by with max
            req = aggregations.AggregateRequest("*").sort_by("@t1", max=1)
            res = await decoded_r.ft().aggregate(req)
            assert len(res["results"]) == 1

            # test limit
            req = aggregations.AggregateRequest("*").sort_by("@t1").limit(1, 1)
            res = await decoded_r.ft().aggregate(req)
            assert len(res["results"]) == 1
            assert res["results"][0]["extra_attributes"] == {"t1": "b"}

    @pytest.mark.redismod
    @pytest.mark.experimental
    async def test_withsuffixtrie(self, decoded_r: redis.Redis):
        # create index
        assert await decoded_r.ft().create_index((TextField("txt"),))
        await self.waitForIndex(decoded_r, getattr(decoded_r.ft(), "index_name", "idx"))
        if is_resp2_connection(decoded_r):
            info = await decoded_r.ft().info()
            assert "WITHSUFFIXTRIE" not in info["attributes"][0]
            assert await decoded_r.ft().dropindex()

            # create withsuffixtrie index (text field)
            assert await decoded_r.ft().create_index(
                TextField("t", withsuffixtrie=True)
            )
            await self.waitForIndex(
                decoded_r, getattr(decoded_r.ft(), "index_name", "idx")
            )
            info = await decoded_r.ft().info()
            assert "WITHSUFFIXTRIE" in info["attributes"][0]
            assert await decoded_r.ft().dropindex()

            # create withsuffixtrie index (tag field)
            assert await decoded_r.ft().create_index(TagField("t", withsuffixtrie=True))
            await self.waitForIndex(
                decoded_r, getattr(decoded_r.ft(), "index_name", "idx")
            )
            info = await decoded_r.ft().info()
            assert "WITHSUFFIXTRIE" in info["attributes"][0]
        else:
            info = await decoded_r.ft().info()
            assert "WITHSUFFIXTRIE" not in info["attributes"][0]["flags"]
            assert await decoded_r.ft().dropindex()

            # create withsuffixtrie index (text fields)
            assert await decoded_r.ft().create_index(
                TextField("t", withsuffixtrie=True)
            )
            await self.waitForIndex(
                decoded_r, getattr(decoded_r.ft(), "index_name", "idx")
            )
            info = await decoded_r.ft().info()
            assert "WITHSUFFIXTRIE" in info["attributes"][0]["flags"]
            assert await decoded_r.ft().dropindex()

            # create withsuffixtrie index (tag field)
            assert await decoded_r.ft().create_index(TagField("t", withsuffixtrie=True))
            await self.waitForIndex(
                decoded_r, getattr(decoded_r.ft(), "index_name", "idx")
            )
            info = await decoded_r.ft().info()
            assert "WITHSUFFIXTRIE" in info["attributes"][0]["flags"]

    @pytest.mark.redismod
    @skip_ifmodversion_lt("2.10.05", "search")
    async def test_aggregations_add_scores(self, decoded_r: redis.Redis):
        assert await decoded_r.ft().create_index(
            (
                TextField("name", sortable=True, weight=5.0),
                NumericField("age", sortable=True),
            )
        )

        assert await decoded_r.hset("doc1", mapping={"name": "bar", "age": "25"})
        assert await decoded_r.hset("doc2", mapping={"name": "foo", "age": "19"})

        req = aggregations.AggregateRequest("*").add_scores()
        res = await decoded_r.ft().aggregate(req)

        if isinstance(res, dict):
            assert len(res["results"]) == 2
            assert res["results"][0]["extra_attributes"] == {"__score": "0.2"}
            assert res["results"][1]["extra_attributes"] == {"__score": "0.2"}
        else:
            assert len(res.rows) == 2
            assert res.rows[0] == ["__score", "0.2"]
            assert res.rows[1] == ["__score", "0.2"]

    @pytest.mark.redismod
    @skip_ifmodversion_lt("2.10.05", "search")
    async def test_aggregations_hybrid_scoring(self, decoded_r: redis.Redis):
        assert await decoded_r.ft().create_index(
            (
                TextField("name", sortable=True, weight=5.0),
                TextField("description", sortable=True, weight=5.0),
                VectorField(
                    "vector",
                    "HNSW",
                    {"TYPE": "FLOAT32", "DIM": 2, "DISTANCE_METRIC": "COSINE"},
                ),
            )
        )

        assert await decoded_r.hset(
            "doc1",
            mapping={
                "name": "cat book",
                "description": "an animal book about cats",
                "vector": np.array([0.1, 0.2]).astype(np.float32).tobytes(),
            },
        )
        assert await decoded_r.hset(
            "doc2",
            mapping={
                "name": "dog book",
                "description": "an animal book about dogs",
                "vector": np.array([0.2, 0.1]).astype(np.float32).tobytes(),
            },
        )

        query_string = "(@description:animal)=>[KNN 3 @vector $vec_param AS dist]"
        req = (
            aggregations.AggregateRequest(query_string)
            .scorer("BM25")
            .add_scores()
            .apply(hybrid_score="@__score + @dist")
            .load("*")
            .dialect(4)
        )

        res = await decoded_r.ft().aggregate(
            req,
            query_params={
                "vec_param": np.array([0.11, 0.22]).astype(np.float32).tobytes()
            },
        )

        if isinstance(res, dict):
            assert len(res["results"]) == 2
        else:
            assert len(res.rows) == 2
            for row in res.rows:
                len(row) == 6


class TestPipeline(AsyncSearchTestsBase):
    @pytest.mark.redismod
    @skip_if_redis_enterprise()
    async def test_search_commands_in_pipeline(self, decoded_r: redis.Redis):
        p = await decoded_r.ft().pipeline()
        p.create_index((TextField("txt"),))
        p.hset("doc1", mapping={"txt": "foo bar"})
        p.hset("doc2", mapping={"txt": "foo bar"})
        q = Query("foo bar").with_payloads()
        await p.search(q)
        res = await p.execute()
        if is_resp2_connection(decoded_r):
            assert res[:3] == ["OK", True, True]
            assert 2 == res[3][0]
            assert "doc1" == res[3][1]
            assert "doc2" == res[3][4]
            assert res[3][5] is None
            assert res[3][3] == res[3][6] == ["txt", "foo bar"]
        else:
            assert res[:3] == ["OK", True, True]
            assert 2 == res[3]["total_results"]
            assert "doc1" == res[3]["results"][0]["id"]
            assert "doc2" == res[3]["results"][1]["id"]
            assert res[3]["results"][0]["payload"] is None
            assert (
                res[3]["results"][0]["extra_attributes"]
                == res[3]["results"][1]["extra_attributes"]
                == {"txt": "foo bar"}
            )

    @pytest.mark.redismod
    @skip_if_server_version_lt("8.3.224")
    async def test_hybrid_search_query_with_pipeline(self, decoded_r: redis.Redis):
        p = decoded_r.ft().pipeline()
        p.create_index(
            (
                TextField("txt"),
                VectorField(
                    "embedding",
                    "FLAT",
                    {"TYPE": "FLOAT32", "DIM": 4, "DISTANCE_METRIC": "L2"},
                ),
            )
        )

        p.hset(
            "doc1",
            mapping={
                "txt": "foo bar",
                "embedding": np.array([1, 2, 3, 4], dtype=np.float32).tobytes(),
            },
        )
        p.hset(
            "doc2",
            mapping={
                "txt": "foo bar",
                "embedding": np.array([1, 2, 2, 3], dtype=np.float32).tobytes(),
            },
        )

        # set search query
        search_query = HybridSearchQuery("foo")

        vsim_query = HybridVsimQuery(
            vector_field_name="@embedding",
            vector_data=np.array([2, 2, 3, 3], dtype=np.float32).tobytes(),
        )

        hybrid_query = HybridQuery(search_query, vsim_query)

        await p.hybrid_search(query=hybrid_query)
        res = await p.execute()

        # the default results count limit is 10
        assert res[:3] == ["OK", 2, 2]
        hybrid_search_res = res[3]
        if is_resp2_connection(decoded_r):
            # it doesn't get parsed to object in pipeline
            assert hybrid_search_res[0] == "total_results"
            assert hybrid_search_res[1] == 2
            assert hybrid_search_res[2] == "results"
            assert len(hybrid_search_res[3]) == 2
            assert hybrid_search_res[4] == "warnings"
            assert hybrid_search_res[5] == []
            assert hybrid_search_res[6] == "execution_time"
            assert float(hybrid_search_res[7]) > 0
        else:
            assert hybrid_search_res["total_results"] == 2
            assert len(hybrid_search_res["results"]) == 2
            assert hybrid_search_res["warnings"] == []
            assert hybrid_search_res["execution_time"] > 0


class TestSearchWithVamana(AsyncSearchTestsBase):
    # SVS-VAMANA Async Tests
    @pytest.mark.redismod
    @skip_if_server_version_lt("8.1.224")
    async def test_async_svs_vamana_basic_functionality(self, decoded_r: redis.Redis):
        await decoded_r.ft().create_index(
            (
                VectorField(
                    "v",
                    "SVS-VAMANA",
                    {"TYPE": "FLOAT32", "DIM": 4, "DISTANCE_METRIC": "L2"},
                ),
            )
        )

        vectors = [
            [1.0, 2.0, 3.0, 4.0],
            [2.0, 3.0, 4.0, 5.0],
            [3.0, 4.0, 5.0, 6.0],
            [10.0, 11.0, 12.0, 13.0],
        ]

        for i, vec in enumerate(vectors):
            await decoded_r.hset(
                f"doc{i}", "v", np.array(vec, dtype=np.float32).tobytes()
            )

        query = "*=>[KNN 3 @v $vec]"
        q = Query(query).return_field("__v_score").sort_by("__v_score", True)
        res = await decoded_r.ft().search(
            q, query_params={"vec": np.array(vectors[0], dtype=np.float32).tobytes()}
        )

        if is_resp2_connection(decoded_r):
            assert res.total == 3
            assert "doc0" == res.docs[0].id
        else:
            assert res["total_results"] == 3
            assert "doc0" == res["results"][0]["id"]

    @pytest.mark.redismod
    @skip_if_server_version_lt("8.1.224")
    async def test_async_svs_vamana_distance_metrics(self, decoded_r: redis.Redis):
        # Test COSINE distance
        await decoded_r.ft().create_index(
            (
                VectorField(
                    "v",
                    "SVS-VAMANA",
                    {"TYPE": "FLOAT32", "DIM": 3, "DISTANCE_METRIC": "COSINE"},
                ),
            )
        )

        vectors = [
            [1.0, 0.0, 0.0],
            [0.707, 0.707, 0.0],
            [0.0, 1.0, 0.0],
            [-1.0, 0.0, 0.0],
        ]

        for i, vec in enumerate(vectors):
            await decoded_r.hset(
                f"doc{i}", "v", np.array(vec, dtype=np.float32).tobytes()
            )

        query = Query("*=>[KNN 2 @v $vec as score]").sort_by("score").no_content()
        query_params = {"vec": np.array(vectors[0], dtype=np.float32).tobytes()}

        res = await decoded_r.ft().search(query, query_params=query_params)
        if is_resp2_connection(decoded_r):
            assert res.total == 2
            assert "doc0" == res.docs[0].id
        else:
            assert res["total_results"] == 2
            assert "doc0" == res["results"][0]["id"]

    @pytest.mark.redismod
    @skip_if_server_version_lt("8.1.224")
    async def test_async_svs_vamana_vector_types(self, decoded_r: redis.Redis):
        # Test FLOAT16
        await decoded_r.ft("idx16").create_index(
            (
                VectorField(
                    "v16",
                    "SVS-VAMANA",
                    {"TYPE": "FLOAT16", "DIM": 4, "DISTANCE_METRIC": "L2"},
                ),
            )
        )

        vectors = [[1.5, 2.5, 3.5, 4.5], [2.5, 3.5, 4.5, 5.5], [3.5, 4.5, 5.5, 6.5]]

        for i, vec in enumerate(vectors):
            await decoded_r.hset(
                f"doc16_{i}", "v16", np.array(vec, dtype=np.float16).tobytes()
            )

        query = Query("*=>[KNN 2 @v16 $vec as score]").no_content()
        query_params = {"vec": np.array(vectors[0], dtype=np.float16).tobytes()}

        res = await decoded_r.ft("idx16").search(query, query_params=query_params)
        if is_resp2_connection(decoded_r):
            assert res.total == 2
            assert "doc16_0" == res.docs[0].id
        else:
            assert res["total_results"] == 2
            assert "doc16_0" == res["results"][0]["id"]

    @pytest.mark.redismod
    @skip_if_server_version_lt("8.1.224")
    async def test_async_svs_vamana_compression(self, decoded_r: redis.Redis):
        await decoded_r.ft().create_index(
            (
                VectorField(
                    "v",
                    "SVS-VAMANA",
                    {
                        "TYPE": "FLOAT32",
                        "DIM": 8,
                        "DISTANCE_METRIC": "L2",
                        "COMPRESSION": "LVQ8",
                        "TRAINING_THRESHOLD": 1024,
                    },
                ),
            )
        )

        vectors = []
        for i in range(20):
            vec = [float(i + j) for j in range(8)]
            vectors.append(vec)
            await decoded_r.hset(
                f"doc{i}", "v", np.array(vec, dtype=np.float32).tobytes()
            )

        query = Query("*=>[KNN 5 @v $vec as score]").no_content()
        query_params = {"vec": np.array(vectors[0], dtype=np.float32).tobytes()}

        res = await decoded_r.ft().search(query, query_params=query_params)
        if is_resp2_connection(decoded_r):
            assert res.total == 5
            assert "doc0" == res.docs[0].id
        else:
            assert res["total_results"] == 5
            assert "doc0" == res["results"][0]["id"]

    @pytest.mark.redismod
    @skip_if_server_version_lt("8.1.224")
    async def test_async_svs_vamana_build_parameters(self, decoded_r: redis.Redis):
        await decoded_r.ft().create_index(
            (
                VectorField(
                    "v",
                    "SVS-VAMANA",
                    {
                        "TYPE": "FLOAT32",
                        "DIM": 6,
                        "DISTANCE_METRIC": "COSINE",
                        "CONSTRUCTION_WINDOW_SIZE": 300,
                        "GRAPH_MAX_DEGREE": 64,
                        "SEARCH_WINDOW_SIZE": 20,
                        "EPSILON": 0.05,
                    },
                ),
            )
        )

        vectors = []
        for i in range(15):
            vec = [float(i + j) for j in range(6)]
            vectors.append(vec)
            await decoded_r.hset(
                f"doc{i}", "v", np.array(vec, dtype=np.float32).tobytes()
            )

        query = Query("*=>[KNN 3 @v $vec as score]").no_content()
        query_params = {"vec": np.array(vectors[0], dtype=np.float32).tobytes()}

        res = await decoded_r.ft().search(query, query_params=query_params)
        if is_resp2_connection(decoded_r):
            assert res.total == 3
            assert "doc0" == res.docs[0].id
        else:
            assert res["total_results"] == 3
            assert "doc0" == res["results"][0]["id"]


class TestHybridSearch(AsyncSearchTestsBase):
    async def _create_hybrid_search_index(self, decoded_r: redis.Redis, dim=4):
        await decoded_r.ft().create_index(
            (
                TextField("description"),
                NumericField("price"),
                TagField("color"),
                TagField("item_type"),
                NumericField("size"),
                VectorField(
                    "embedding",
                    "FLAT",
                    {
                        "TYPE": "FLOAT32",
                        "DIM": dim,
                        "DISTANCE_METRIC": "L2",
                    },
                ),
                VectorField(
                    "embedding-hnsw",
                    "HNSW",
                    {
                        "TYPE": "FLOAT32",
                        "DIM": dim,
                        "DISTANCE_METRIC": "L2",
                    },
                ),
            ),
            definition=IndexDefinition(prefix=["item:"]),
        )
        await AsyncSearchTestsBase.waitForIndex(decoded_r, "idx")

    @staticmethod
    def _generate_random_vector(dim):
        return [random.random() for _ in range(dim)]

    @staticmethod
    def _generate_random_str_data(dim):
        chars = "abcdefgh12345678"
        return "".join(random.choice(chars) for _ in range(dim))

    @staticmethod
    async def _add_data_for_hybrid_search(
        client: redis.Redis,
        items_sets=1,
        randomize_data=False,
        dim_for_random_data=4,
        use_random_str_data=False,
    ):
        if randomize_data or use_random_str_data:
            generate_data_func = (
                TestHybridSearch._generate_random_str_data
                if use_random_str_data
                else TestHybridSearch._generate_random_vector
            )

            dim_for_random_data = (
                dim_for_random_data * 4 if use_random_str_data else dim_for_random_data
            )

            items = [
                (generate_data_func(dim_for_random_data), "red shoes"),
                (generate_data_func(dim_for_random_data), "green shoes with red laces"),
                (generate_data_func(dim_for_random_data), "red dress"),
                (generate_data_func(dim_for_random_data), "orange dress"),
                (generate_data_func(dim_for_random_data), "black shoes"),
            ]
        else:
            items = [
                ([1.0, 2.0, 7.0, 8.0], "red shoes"),
                ([1.0, 4.0, 7.0, 8.0], "green shoes with red laces"),
                ([1.0, 2.0, 6.0, 5.0], "red dress"),
                ([2.0, 3.0, 6.0, 5.0], "orange dress"),
                ([5.0, 6.0, 7.0, 8.0], "black shoes"),
            ]
        items = items * items_sets
        pipeline = client.pipeline()
        for i, vec in enumerate(items):
            vec, description = vec
            mapping = {
                "description": description,
                "embedding": np.array(vec, dtype=np.float32).tobytes()
                if not use_random_str_data
                else vec,
                "embedding-hnsw": np.array(vec, dtype=np.float32).tobytes()
                if not use_random_str_data
                else vec,
                "price": 15 + i % 4,
                "color": description.split(" ")[0],
                "item_type": description.split(" ")[1],
                "size": 10 + i % 3,
            }
            pipeline.hset(f"item:{i}", mapping=mapping)
        await pipeline.execute()  # Execute all at once

    @staticmethod
    def _convert_dict_values_to_str(list_of_dicts):
        res = []
        for d in list_of_dicts:
            res_dict = {}
            for k, v in d.items():
                if isinstance(v, list):
                    res_dict[k] = [safe_str(x) for x in v]
                else:
                    res_dict[k] = safe_str(v)
            res.append(res_dict)
        return res

    @staticmethod
    def compare_list_of_dicts(actual, expected):
        assert len(actual) == len(expected), (
            f"List of dicts length mismatch: {len(actual)} != {len(expected)}. "
            f"Full dicts: actual:{actual}; expected:{expected}"
        )
        for expected_dict_item in expected:
            found = False
            for actual_dict_item in actual:
                if actual_dict_item == expected_dict_item:
                    found = True
                    break
            if not found:
                assert False, (
                    f"Dict {expected_dict_item} not found in actual list of dicts: {actual}. "
                    f"All expected:{expected}"
                )

    @pytest.mark.redismod
    @skip_if_server_version_lt("8.3.224")
    async def test_basic_hybrid_search(self, decoded_r):
        # Create index and add data
        await self._create_hybrid_search_index(decoded_r)
        await self._add_data_for_hybrid_search(decoded_r, items_sets=5)

        # set search query
        search_query = HybridSearchQuery("@color:{red} @color:{green}")

        vsim_query = HybridVsimQuery(
            vector_field_name="@embedding",
            vector_data=np.array([-100, -200, -200, -300], dtype=np.float32).tobytes(),
        )

        hybrid_query = HybridQuery(search_query, vsim_query)

        res = await decoded_r.ft().hybrid_search(query=hybrid_query)

        # the default results count limit is 10
        if is_resp2_connection(decoded_r):
            assert res.total_results == 10
            assert len(res.results) == 10
            assert res.warnings == []
            assert res.execution_time > 0
            assert all(isinstance(res.results[i]["__score"], bytes) for i in range(10))
            assert all(isinstance(res.results[i]["__key"], bytes) for i in range(10))
        else:
            assert res["total_results"] == 10
            assert len(res["results"]) == 10
            assert res["warnings"] == []
            assert res["execution_time"] > 0
            assert all(isinstance(res["results"][i]["__score"], str) for i in range(10))
            assert all(isinstance(res["results"][i]["__key"], str) for i in range(10))

    @pytest.mark.redismod
    @skip_if_server_version_lt("8.3.224")
    async def test_hybrid_search_query_with_scorer(self, decoded_r):
        # Create index and add data
        await self._create_hybrid_search_index(decoded_r)
        await self._add_data_for_hybrid_search(decoded_r, items_sets=10)

        # set search query
        search_query = HybridSearchQuery("shoes")
        search_query.scorer("TFIDF")

        vsim_query = HybridVsimQuery(
            vector_field_name="@embedding",
            vector_data=np.array([1, 2, 2, 3], dtype=np.float32).tobytes(),
        )

        hybrid_query = HybridQuery(search_query, vsim_query)

        combine_method = CombineResultsMethod(
            CombinationMethods.LINEAR, ALPHA=1, BETA=0
        )

        postprocessing_config = HybridPostProcessingConfig()
        postprocessing_config.load(
            "@description", "@color", "@price", "@size", "@__score", "@__item"
        )
        postprocessing_config.limit(0, 2)

        res = await decoded_r.ft().hybrid_search(
            query=hybrid_query,
            combine_method=combine_method,
            post_processing=postprocessing_config,
            timeout=10,
        )

        expected_results_tfidf = [
            {
                "description": b"red shoes",
                "color": b"red",
                "price": b"15",
                "size": b"10",
                "__score": b"2",
            },
            {
                "description": b"green shoes with red laces",
                "color": b"green",
                "price": b"16",
                "size": b"11",
                "__score": b"2",
            },
        ]

        if is_resp2_connection(decoded_r):
            assert res.total_results >= 2
            assert len(res.results) == 2
            assert res.results == expected_results_tfidf
            assert res.warnings == []
        else:
            assert res["total_results"] >= 2
            assert len(res["results"]) == 2
            assert res["results"] == self._convert_dict_values_to_str(
                expected_results_tfidf
            )
            assert res["warnings"] == []

        search_query.scorer("BM25")
        res = await decoded_r.ft().hybrid_search(
            query=hybrid_query,
            combine_method=combine_method,
            post_processing=postprocessing_config,
            timeout=10,
        )
        expected_results_bm25 = [
            {
                "description": b"red shoes",
                "color": b"red",
                "price": b"15",
                "size": b"10",
                "__score": b"0.657894719299",
            },
            {
                "description": b"green shoes with red laces",
                "color": b"green",
                "price": b"16",
                "size": b"11",
                "__score": b"0.657894719299",
            },
        ]
        if is_resp2_connection(decoded_r):
            assert res.total_results >= 2
            assert len(res.results) == 2
            assert res.results == expected_results_bm25
            assert res.warnings == []
        else:
            assert res["total_results"] >= 2
            assert len(res["results"]) == 2
            assert res["results"] == self._convert_dict_values_to_str(
                expected_results_bm25
            )
            assert res["warnings"] == []

    @pytest.mark.redismod
    @skip_if_server_version_lt("8.3.224")
    async def test_hybrid_search_query_with_vsim_filter(self, decoded_r):
        # Create index and add data
        await self._create_hybrid_search_index(decoded_r)
        await self._add_data_for_hybrid_search(
            decoded_r, items_sets=5, use_random_str_data=True
        )

        search_query = HybridSearchQuery("@color:{missing}")

        vsim_query = HybridVsimQuery(
            vector_field_name="@embedding",
            vector_data="abcd1234efgh5678",
        )
        vsim_query.filter(HybridFilter("@price:[15 16] @size:[10 11]"))

        hybrid_query = HybridQuery(search_query, vsim_query)

        postprocessing_config = HybridPostProcessingConfig()
        postprocessing_config.load("@price", "@size")

        res = await decoded_r.ft().hybrid_search(
            query=hybrid_query, post_processing=postprocessing_config, timeout=10
        )
        if is_resp2_connection(decoded_r):
            assert len(res.results) > 0
            assert res.warnings == []
            for item in res.results:
                assert item["price"] in [b"15", b"16"]
                assert item["size"] in [b"10", b"11"]
        else:
            assert len(res["results"]) > 0
            assert res["warnings"] == []
            for item in res["results"]:
                assert item["price"] in ["15", "16"]
                assert item["size"] in ["10", "11"]

    @pytest.mark.redismod
    @skip_if_server_version_lt("8.3.224")
    async def test_hybrid_search_query_with_vsim_knn(self, decoded_r):
        # Create index and add data
        await self._create_hybrid_search_index(decoded_r)
        await self._add_data_for_hybrid_search(decoded_r, items_sets=10)

        # set search query
        # this query won't have results, so we will be able to validate vsim results
        search_query = HybridSearchQuery("@color:{none}")

        vsim_query = HybridVsimQuery(
            vector_field_name="@embedding",
            vector_data=np.array([1, 2, 2, 3], dtype=np.float32).tobytes(),
        )

        vsim_query.vsim_method_params(VectorSearchMethods.KNN, K=3)

        hybrid_query = HybridQuery(search_query, vsim_query)

        postprocessing_config = HybridPostProcessingConfig()

        res = await decoded_r.ft().hybrid_search(
            query=hybrid_query, post_processing=postprocessing_config, timeout=10
        )
        expected_results = [
            {"__key": b"item:2", "__score": b"0.016393442623"},
            {"__key": b"item:7", "__score": b"0.0161290322581"},
            {"__key": b"item:12", "__score": b"0.015873015873"},
        ]
        if is_resp2_connection(decoded_r):
            assert res.total_results == 3  # KNN top-k value
            assert len(res.results) == 3
            assert res.results == expected_results
            assert res.warnings == []
            assert res.execution_time > 0
        else:
            assert res["total_results"] == 3  # KNN top-k value
            assert len(res["results"]) == 3
            assert res["results"] == self._convert_dict_values_to_str(expected_results)
            assert res["warnings"] == []
            assert res["execution_time"] > 0

        vsim_query_with_hnsw = HybridVsimQuery(
            vector_field_name="@embedding-hnsw",
            vector_data=np.array([1, 2, 2, 3], dtype=np.float32).tobytes(),
        )
        vsim_query_with_hnsw.vsim_method_params(
            VectorSearchMethods.KNN, K=3, EF_RUNTIME=1
        )
        hybrid_query_with_hnsw = HybridQuery(search_query, vsim_query_with_hnsw)

        res2 = await decoded_r.ft().hybrid_search(
            query=hybrid_query_with_hnsw, timeout=10
        )

        expected_results2 = [
            {"__key": b"item:12", "__score": b"0.016393442623"},
            {"__key": b"item:22", "__score": b"0.0161290322581"},
            {"__key": b"item:27", "__score": b"0.015873015873"},
        ]
        if is_resp2_connection(decoded_r):
            assert res2.total_results == 3  # KNN top-k value
            assert len(res2.results) == 3
            assert res2.results == expected_results2
            assert res2.warnings == []
            assert res2.execution_time > 0
        else:
            assert res2["total_results"] == 3  # KNN top-k value
            assert len(res2["results"]) == 3
            assert res2["results"] == self._convert_dict_values_to_str(
                expected_results2
            )
            assert res2["warnings"] == []
            assert res2["execution_time"] > 0

    @pytest.mark.redismod
    @skip_if_server_version_lt("8.3.224")
    async def test_hybrid_search_query_with_vsim_range(self, decoded_r):
        # Create index and add data
        await self._create_hybrid_search_index(decoded_r)
        await self._add_data_for_hybrid_search(decoded_r, items_sets=10)

        # set search query
        # this query won't have results, so we will be able to validate vsim results
        search_query = HybridSearchQuery("@color:{none}")

        vsim_query = HybridVsimQuery(
            vector_field_name="@embedding",
            vector_data=np.array([1, 2, 7, 6], dtype=np.float32).tobytes(),
        )

        vsim_query.vsim_method_params(VectorSearchMethods.RANGE, RADIUS=2)

        hybrid_query = HybridQuery(search_query, vsim_query)

        postprocessing_config = HybridPostProcessingConfig()
        postprocessing_config.limit(0, 3)

        res = await decoded_r.ft().hybrid_search(
            query=hybrid_query, post_processing=postprocessing_config, timeout=10
        )

        expected_results = [
            {"__key": b"item:2", "__score": b"0.016393442623"},
            {"__key": b"item:7", "__score": b"0.0161290322581"},
            {"__key": b"item:12", "__score": b"0.015873015873"},
        ]
        if is_resp2_connection(decoded_r):
            assert res.total_results >= 3  # at least 3 results
            assert len(res.results) == 3
            assert res.results == expected_results
            assert res.warnings == []
            assert res.execution_time > 0
        else:
            assert res["total_results"] >= 3
            assert len(res["results"]) == 3
            assert res["results"] == self._convert_dict_values_to_str(expected_results)
            assert res["warnings"] == []
            assert res["execution_time"] > 0

        vsim_query_with_hnsw = HybridVsimQuery(
            vector_field_name="@embedding-hnsw",
            vector_data=np.array([1, 2, 7, 6], dtype=np.float32).tobytes(),
        )

        vsim_query_with_hnsw.vsim_method_params(
            VectorSearchMethods.RANGE, RADIUS=2, EPSILON=0.5
        )

        hybrid_query_with_hnsw = HybridQuery(search_query, vsim_query_with_hnsw)

        res = await decoded_r.ft().hybrid_search(
            query=hybrid_query_with_hnsw,
            post_processing=postprocessing_config,
            timeout=10,
        )

        expected_results_hnsw = [
            {"__key": b"item:27", "__score": b"0.016393442623"},
            {"__key": b"item:12", "__score": b"0.0161290322581"},
            {"__key": b"item:22", "__score": b"0.015873015873"},
        ]

        if is_resp2_connection(decoded_r):
            assert res.total_results >= 3
            assert len(res.results) == 3
            assert res.results == expected_results_hnsw
            assert res.warnings == []
            assert res.execution_time > 0
        else:
            assert res["total_results"] >= 3
            assert len(res["results"]) == 3
            assert res["results"] == self._convert_dict_values_to_str(
                expected_results_hnsw
            )
            assert res["warnings"] == []
            assert res["execution_time"] > 0

    @pytest.mark.redismod
    @skip_if_server_version_lt("8.3.224")
    async def test_hybrid_search_query_with_combine_all_score_aliases(self, decoded_r):
        # Create index and add data
        await self._create_hybrid_search_index(decoded_r)
        await self._add_data_for_hybrid_search(
            decoded_r, items_sets=1, use_random_str_data=True
        )

        search_query = HybridSearchQuery("shoes")
        search_query.yield_score_as("search_score")

        vsim_query = HybridVsimQuery(
            vector_field_name="@embedding-hnsw",
            vector_data="abcd1234efgh5678",
            vsim_search_method=VectorSearchMethods.KNN,
            vsim_search_method_params={"K": 3, "EF_RUNTIME": 1},
            yield_score_as="vsim_score",
        )

        hybrid_query = HybridQuery(search_query, vsim_query)

        combine_method = CombineResultsMethod(
            CombinationMethods.LINEAR,
            ALPHA=0.5,
            BETA=0.5,
            YIELD_SCORE_AS="combined_score",
        )

        res = await decoded_r.ft().hybrid_search(
            query=hybrid_query, combine_method=combine_method, timeout=10
        )

        if is_resp2_connection(decoded_r):
            assert len(res.results) > 0
            assert res.warnings == []
            for item in res.results:
                assert item["combined_score"] is not None
                assert "__score" not in item
                if item["__key"] in [b"item:0", b"item:1", b"item:4"]:
                    assert item["search_score"] is not None
                else:
                    assert "search_score" not in item
                if item["__key"] in [b"item:0", b"item:1", b"item:2"]:
                    assert item["vsim_score"] is not None
                else:
                    assert "vsim_score" not in item

        else:
            assert len(res["results"]) > 0
            assert res["warnings"] == []
            for item in res["results"]:
                assert item["combined_score"] is not None
                assert "__score" not in item
                if item["__key"] in ["item:0", "item:1", "item:4"]:
                    assert item["search_score"] is not None
                else:
                    assert "search_score" not in item
                if item["__key"] in ["item:0", "item:1", "item:2"]:
                    assert item["vsim_score"] is not None
                else:
                    assert "vsim_score" not in item

    @pytest.mark.redismod
    @skip_if_server_version_lt("8.3.224")
    async def test_hybrid_search_query_with_combine(self, decoded_r):
        # Create index and add data
        await self._create_hybrid_search_index(decoded_r)
        await self._add_data_for_hybrid_search(decoded_r, items_sets=10)

        # set search query
        search_query = HybridSearchQuery("@color:{red}")

        vsim_query = HybridVsimQuery(
            vector_field_name="@embedding",
            vector_data=np.array([1, 2, 7, 6], dtype=np.float32).tobytes(),
        )

        hybrid_query = HybridQuery(search_query, vsim_query)

        combine_method_linear = CombineResultsMethod(
            CombinationMethods.LINEAR, ALPHA=0.5, BETA=0.5
        )

        postprocessing_config = HybridPostProcessingConfig()
        postprocessing_config.limit(0, 3)

        res = await decoded_r.ft().hybrid_search(
            query=hybrid_query,
            combine_method=combine_method_linear,
            post_processing=postprocessing_config,
            timeout=10,
        )

        expected_results = [
            {"__key": b"item:2", "__score": b"0.166666666667"},
            {"__key": b"item:7", "__score": b"0.166666666667"},
            {"__key": b"item:12", "__score": b"0.166666666667"},
        ]
        if is_resp2_connection(decoded_r):
            assert res.total_results >= 3
            assert len(res.results) == 3
            assert res.results == expected_results
            assert res.warnings == []
            assert res.execution_time > 0
        else:
            assert res["total_results"] >= 3
            assert len(res["results"]) == 3
            assert res["results"] == self._convert_dict_values_to_str(expected_results)
            assert res["warnings"] == []
            assert res["execution_time"] > 0

        # combine with RRF and WINDOW + CONSTANT
        combine_method_rrf = CombineResultsMethod(
            CombinationMethods.RRF, WINDOW=3, CONSTANT=0.5
        )
        res = await decoded_r.ft().hybrid_search(
            query=hybrid_query,
            combine_method=combine_method_rrf,
            post_processing=postprocessing_config,
            timeout=10,
        )

        expected_results = [
            {"__key": b"item:2", "__score": b"1.06666666667"},
            {"__key": b"item:0", "__score": b"0.666666666667"},
            {"__key": b"item:7", "__score": b"0.4"},
        ]
        if is_resp2_connection(decoded_r):
            assert res.total_results >= 3
            assert len(res.results) == 3
            assert res.results == expected_results
            assert res.warnings == []
            assert res.execution_time > 0
        else:
            assert res["total_results"] >= 3
            assert len(res["results"]) == 3
            assert res["results"] == self._convert_dict_values_to_str(expected_results)
            assert res["warnings"] == []
            assert res["execution_time"] > 0

        # combine with RRF, not all possible params provided
        combine_method_rrf_2 = CombineResultsMethod(CombinationMethods.RRF, WINDOW=3)
        res = await decoded_r.ft().hybrid_search(
            query=hybrid_query,
            combine_method=combine_method_rrf_2,
            post_processing=postprocessing_config,
            timeout=10,
        )

        expected_results = [
            {"__key": b"item:2", "__score": b"0.032522474881"},
            {"__key": b"item:0", "__score": b"0.016393442623"},
            {"__key": b"item:7", "__score": b"0.0161290322581"},
        ]
        if is_resp2_connection(decoded_r):
            assert res.total_results >= 3
            assert len(res.results) == 3
            assert res.results == expected_results
            assert res.warnings == []
            assert res.execution_time > 0
        else:
            assert res["total_results"] >= 3
            assert len(res["results"]) == 3
            assert res["results"] == self._convert_dict_values_to_str(expected_results)
            assert res["warnings"] == []
            assert res["execution_time"] > 0

    @pytest.mark.redismod
    @skip_if_server_version_lt("8.3.224")
    async def test_hybrid_search_query_with_load(self, decoded_r):
        # Create index and add data
        await self._create_hybrid_search_index(decoded_r)
        await self._add_data_for_hybrid_search(decoded_r, items_sets=10)

        # set search query
        search_query = HybridSearchQuery("@color:{red|green|black}")

        vsim_query = HybridVsimQuery(
            vector_field_name="@embedding",
            vector_data=np.array([1, 2, 7, 6], dtype=np.float32).tobytes(),
        )

        hybrid_query = HybridQuery(search_query, vsim_query)

        combine_method = CombineResultsMethod(
            CombinationMethods.LINEAR, ALPHA=0.5, BETA=0.5
        )

        postprocessing_config = HybridPostProcessingConfig()
        postprocessing_config.load(
            "@description", "@color", "@price", "@size", "@__key AS item_key"
        )
        postprocessing_config.limit(0, 1)

        res = await decoded_r.ft().hybrid_search(
            query=hybrid_query,
            combine_method=combine_method,
            post_processing=postprocessing_config,
            timeout=10,
        )

        expected_results = [
            {
                "description": b"red dress",
                "color": b"red",
                "price": b"17",
                "size": b"12",
                "item_key": b"item:2",
            }
        ]
        if is_resp2_connection(decoded_r):
            assert res.total_results >= 1
            assert len(res.results) == 1
            self.compare_list_of_dicts(res.results, expected_results)
            assert res.warnings == []
            assert res.execution_time > 0
        else:
            assert res["total_results"] >= 1
            assert len(res["results"]) == 1
            self.compare_list_of_dicts(
                res["results"], self._convert_dict_values_to_str(expected_results)
            )
            assert res["warnings"] == []
            assert res["execution_time"] > 0

    @pytest.mark.redismod
    @skip_if_server_version_lt("8.3.224")
    async def test_hybrid_search_query_with_load_and_apply(self, decoded_r):
        # Create index and add data
        await self._create_hybrid_search_index(decoded_r)
        await self._add_data_for_hybrid_search(decoded_r, items_sets=10)

        # set search query
        search_query = HybridSearchQuery("@color:{red}")

        vsim_query = HybridVsimQuery(
            vector_field_name="@embedding",
            vector_data=np.array([1, 2, 7, 6], dtype=np.float32).tobytes(),
        )

        hybrid_query = HybridQuery(search_query, vsim_query)

        postprocessing_config = HybridPostProcessingConfig()
        postprocessing_config.load("@color", "@price", "@size")
        postprocessing_config.apply(
            price_discount="@price - (@price * 0.1)",
            tax_discount="@price_discount * 0.2",
        )
        postprocessing_config.limit(0, 3)

        res = await decoded_r.ft().hybrid_search(
            query=hybrid_query, post_processing=postprocessing_config, timeout=10
        )

        expected_results = [
            {
                "color": b"red",
                "price": b"15",
                "size": b"10",
                "price_discount": b"13.5",
                "tax_discount": b"2.7",
            },
            {
                "color": b"red",
                "price": b"17",
                "size": b"12",
                "price_discount": b"15.3",
                "tax_discount": b"3.06",
            },
            {
                "color": b"red",
                "price": b"18",
                "size": b"11",
                "price_discount": b"16.2",
                "tax_discount": b"3.24",
            },
        ]
        if is_resp2_connection(decoded_r):
            assert len(res.results) == 3
            self.compare_list_of_dicts(res.results, expected_results)
            assert res.warnings == []
            assert res.execution_time > 0
        else:
            assert len(res["results"]) == 3
            self.compare_list_of_dicts(
                res["results"], self._convert_dict_values_to_str(expected_results)
            )
            assert res["warnings"] == []
            assert res["execution_time"] > 0

    @pytest.mark.redismod
    @skip_if_server_version_lt("8.3.224")
    async def test_hybrid_search_query_with_load_and_filter(self, decoded_r):
        # Create index and add data
        await self._create_hybrid_search_index(decoded_r)
        await self._add_data_for_hybrid_search(decoded_r, items_sets=10)

        # set search query
        search_query = HybridSearchQuery("@color:{red|green|black}")

        vsim_query = HybridVsimQuery(
            vector_field_name="@embedding",
            vector_data=np.array([1, 2, 7, 6], dtype=np.float32).tobytes(),
        )

        hybrid_query = HybridQuery(search_query, vsim_query)

        postprocessing_config = HybridPostProcessingConfig()
        postprocessing_config.load("@description", "@color", "@price", "@size")
        # for the postprocessing filter we need to filter on the loaded fields
        # expecting all of them to be interpreted as strings - the initial filed types
        # are not preserved
        postprocessing_config.filter(HybridFilter('@price=="15"'))
        postprocessing_config.limit(0, 3)

        res = await decoded_r.ft().hybrid_search(
            query=hybrid_query, post_processing=postprocessing_config, timeout=10
        )

        if is_resp2_connection(decoded_r):
            assert len(res.results) == 3
            for item in res.results:
                assert item["price"] == b"15"
            assert res.warnings == []
            assert res.execution_time > 0
        else:
            assert len(res["results"]) == 3
            for item in res["results"]:
                assert item["price"] == "15"
            assert res["warnings"] == []
            assert res["execution_time"] > 0

    @pytest.mark.redismod
    @skip_if_server_version_lt("8.3.224")
    async def test_hybrid_search_query_with_load_apply_and_params(self, decoded_r):
        # Create index and add data
        await self._create_hybrid_search_index(decoded_r)
        await self._add_data_for_hybrid_search(
            decoded_r, items_sets=5, use_random_str_data=True
        )

        # set search query
        search_query = HybridSearchQuery("@color:{$color_criteria}")

        vsim_query = HybridVsimQuery(
            vector_field_name="@embedding",
            vector_data="$vector",
        )

        hybrid_query = HybridQuery(search_query, vsim_query)

        postprocessing_config = HybridPostProcessingConfig()
        postprocessing_config.load("@description", "@color", "@price")
        postprocessing_config.apply(price_discount="@price - (@price * 0.1)")
        postprocessing_config.limit(0, 3)

        params_substitution = {
            "vector": "abcd1234abcd5678",
            "color_criteria": "red",
        }

        res = await decoded_r.ft().hybrid_search(
            query=hybrid_query,
            post_processing=postprocessing_config,
            params_substitution=params_substitution,
            timeout=10,
        )

        expected_results = [
            {
                "description": b"red shoes",
                "color": b"red",
                "price": b"15",
                "price_discount": b"13.5",
            },
            {
                "description": b"red dress",
                "color": b"red",
                "price": b"17",
                "price_discount": b"15.3",
            },
            {
                "description": b"red shoes",
                "color": b"red",
                "price": b"16",
                "price_discount": b"14.4",
            },
        ]
        if is_resp2_connection(decoded_r):
            assert len(res.results) == 3
            assert res.results == expected_results
            assert res.warnings == []
            assert res.execution_time > 0
        else:
            assert len(res["results"]) == 3
            assert res["results"] == self._convert_dict_values_to_str(expected_results)
            assert res["warnings"] == []
            assert res["execution_time"] > 0

    @pytest.mark.redismod
    @skip_if_server_version_lt("8.3.224")
    async def test_hybrid_search_query_with_limit(self, decoded_r):
        # Create index and add data
        await self._create_hybrid_search_index(decoded_r)
        await self._add_data_for_hybrid_search(decoded_r, items_sets=10)

        # set search query
        search_query = HybridSearchQuery("@color:{red}")

        vsim_query = HybridVsimQuery(
            vector_field_name="@embedding",
            vector_data=np.array([1, 2, 7, 6], dtype=np.float32).tobytes(),
        )

        hybrid_query = HybridQuery(search_query, vsim_query)

        postprocessing_config = HybridPostProcessingConfig()
        postprocessing_config.limit(0, 3)

        res = await decoded_r.ft().hybrid_search(
            query=hybrid_query, post_processing=postprocessing_config, timeout=10
        )

        if is_resp2_connection(decoded_r):
            assert len(res.results) == 3
            assert res.warnings == []
        else:
            assert len(res["results"]) == 3
            assert res["warnings"] == []

    @pytest.mark.redismod
    @skip_if_server_version_lt("8.3.224")
    async def test_hybrid_search_query_with_load_apply_and_sortby(self, decoded_r):
        # Create index and add data
        await self._create_hybrid_search_index(decoded_r)
        await self._add_data_for_hybrid_search(decoded_r, items_sets=1)

        # set search query
        search_query = HybridSearchQuery("@color:{red|green}")

        vsim_query = HybridVsimQuery(
            vector_field_name="@embedding",
            vector_data=np.array([1, 2, 7, 6], dtype=np.float32).tobytes(),
        )

        hybrid_query = HybridQuery(search_query, vsim_query)

        postprocessing_config = HybridPostProcessingConfig()
        postprocessing_config.load("@color", "@price")
        postprocessing_config.apply(price_discount="@price - (@price * 0.1)")
        postprocessing_config.sort_by(
            SortbyField("@price_discount", asc=False), SortbyField("@color", asc=True)
        )
        postprocessing_config.limit(0, 5)

        res = await decoded_r.ft().hybrid_search(
            query=hybrid_query, post_processing=postprocessing_config, timeout=10
        )

        expected_results = [
            {"color": b"orange", "price": b"18", "price_discount": b"16.2"},
            {"color": b"red", "price": b"17", "price_discount": b"15.3"},
            {"color": b"green", "price": b"16", "price_discount": b"14.4"},
            {"color": b"black", "price": b"15", "price_discount": b"13.5"},
            {"color": b"red", "price": b"15", "price_discount": b"13.5"},
        ]
        if is_resp2_connection(decoded_r):
            assert res.total_results >= 5
            assert len(res.results) == 5
            # the order here should match because of the sort
            assert res.results == expected_results
            assert res.warnings == []
            assert res.execution_time > 0
        else:
            assert res["total_results"] >= 5
            assert len(res["results"]) == 5
            # the order here should match because of the sort
            assert res["results"] == self._convert_dict_values_to_str(expected_results)
            assert res["warnings"] == []
            assert res["execution_time"] > 0

    @pytest.mark.redismod
    @skip_if_server_version_lt("8.3.224")
    async def test_hybrid_search_query_with_timeout(self, decoded_r):
        dim = 128
        # Create index and add data
        await self._create_hybrid_search_index(decoded_r, dim=dim)
        await self._add_data_for_hybrid_search(
            decoded_r,
            items_sets=5000,
            dim_for_random_data=dim,
            use_random_str_data=True,
        )

        # set search query
        search_query = HybridSearchQuery("*")

        vsim_query = HybridVsimQuery(
            vector_field_name="@embedding-hnsw",
            vector_data="abcd" * dim,
        )
        vsim_query.vsim_method_params(VectorSearchMethods.KNN, K=1000)
        vsim_query.filter(
            HybridFilter(
                "((@price:[15 16] @size:[10 11]) | (@price:[13 15] @size:[11 12])) @description:(shoes) -@description:(green)"
            )
        )

        hybrid_query = HybridQuery(search_query, vsim_query)

        combine_method = CombineResultsMethod(CombinationMethods.RRF, WINDOW=1000)

        timeout = 5000  # 5 second timeout
        res = await decoded_r.ft().hybrid_search(
            query=hybrid_query, combine_method=combine_method, timeout=timeout
        )

        if is_resp2_connection(decoded_r):
            assert len(res.results) > 0
            assert res.warnings == []
            assert res.execution_time > 0 and res.execution_time < timeout
        else:
            assert len(res["results"]) > 0
            assert res["warnings"] == []
            assert res["execution_time"] > 0 and res["execution_time"] < timeout

        res = await decoded_r.ft().hybrid_search(
            query=hybrid_query, timeout=1
        )  # 1 ms timeout
        if is_resp2_connection(decoded_r):
            assert (
                b"Timeout limit was reached (VSIM)" in res.warnings
                or b"Timeout limit was reached (SEARCH)" in res.warnings
            )
        else:
            assert (
                "Timeout limit was reached (VSIM)" in res["warnings"]
                or "Timeout limit was reached (SEARCH)" in res["warnings"]
            )

    @pytest.mark.redismod
    @skip_if_server_version_lt("8.3.224")
    async def test_hybrid_search_query_with_load_and_groupby(self, decoded_r):
        # Create index and add data
        await self._create_hybrid_search_index(decoded_r)
        await self._add_data_for_hybrid_search(decoded_r, items_sets=10)

        # set search query
        search_query = HybridSearchQuery("@color:{red|green}")

        vsim_query = HybridVsimQuery(
            vector_field_name="@embedding",
            vector_data=np.array([1, 2, 7, 6], dtype=np.float32).tobytes(),
        )

        hybrid_query = HybridQuery(search_query, vsim_query)

        postprocessing_config = HybridPostProcessingConfig()
        postprocessing_config.load("@color", "@price", "@size", "@item_type")
        postprocessing_config.limit(0, 4)

        postprocessing_config.group_by(
            ["@price"],
            reducers.count_distinct("@color").alias("colors_count"),
        )

        postprocessing_config.sort_by(SortbyField("@price", asc=True))

        res = await decoded_r.ft().hybrid_search(
            query=hybrid_query, post_processing=postprocessing_config, timeout=10
        )

        expected_results = [
            {"price": b"15", "colors_count": b"2"},
            {"price": b"16", "colors_count": b"2"},
            {"price": b"17", "colors_count": b"2"},
            {"price": b"18", "colors_count": b"2"},
        ]

        if is_resp2_connection(decoded_r):
            assert len(res.results) == 4
            assert res.results == expected_results
            assert res.warnings == []
        else:
            assert len(res["results"]) == 4
            assert res["results"] == self._convert_dict_values_to_str(expected_results)
            assert res["warnings"] == []

        postprocessing_config = HybridPostProcessingConfig()
        postprocessing_config.load("@color", "@price", "@size", "@item_type")
        postprocessing_config.limit(0, 6)
        postprocessing_config.sort_by(
            SortbyField("@price", asc=True),
            SortbyField("@item_type", asc=True),
        )

        postprocessing_config.group_by(
            ["@price", "@item_type"],
            reducers.count_distinct("@color").alias("unique_colors_count"),
        )

        res = await decoded_r.ft().hybrid_search(
            query=hybrid_query, post_processing=postprocessing_config, timeout=1000
        )

        expected_results = [
            {"price": b"15", "item_type": b"dress", "unique_colors_count": b"1"},
            {"price": b"15", "item_type": b"shoes", "unique_colors_count": b"2"},
            {"price": b"16", "item_type": b"dress", "unique_colors_count": b"1"},
            {"price": b"16", "item_type": b"shoes", "unique_colors_count": b"2"},
            {"price": b"17", "item_type": b"dress", "unique_colors_count": b"1"},
            {"price": b"17", "item_type": b"shoes", "unique_colors_count": b"2"},
        ]
        if is_resp2_connection(decoded_r):
            assert len(res.results) == 6
            assert res.results == expected_results
            assert res.warnings == []
        else:
            assert len(res["results"]) == 6
            assert res["results"] == self._convert_dict_values_to_str(expected_results)
            assert res["warnings"] == []

    @pytest.mark.redismod
    @skip_if_server_version_lt("8.3.224")
    async def test_hybrid_search_query_with_cursor(self, decoded_r):
        # Create index and add data
        await self._create_hybrid_search_index(decoded_r)
        await self._add_data_for_hybrid_search(decoded_r, items_sets=10)

        # set search query
        search_query = HybridSearchQuery("@color:{red|green}")

        vsim_query = HybridVsimQuery(
            vector_field_name="@embedding",
            vector_data=np.array([1, 2, 7, 6], dtype=np.float32).tobytes(),
        )

        hybrid_query = HybridQuery(search_query, vsim_query)

        res = await decoded_r.ft().hybrid_search(
            query=hybrid_query,
            cursor=HybridCursorQuery(count=5, max_idle=100),
            timeout=10,
        )
        if is_resp2_connection(decoded_r):
            assert isinstance(res, HybridCursorResult)
            assert res.search_cursor_id > 0
            assert res.vsim_cursor_id > 0
            search_cursor = aggregations.Cursor(res.search_cursor_id)
            vsim_cursor = aggregations.Cursor(res.vsim_cursor_id)
        else:
            assert res["SEARCH"] > 0
            assert res["VSIM"] > 0
            search_cursor = aggregations.Cursor(res["SEARCH"])
            vsim_cursor = aggregations.Cursor(res["VSIM"])

        search_res_from_cursor = await decoded_r.ft().aggregate(query=search_cursor)
        if is_resp2_connection(decoded_r):
            assert len(search_res_from_cursor.rows) == 5
        else:
            assert len(search_res_from_cursor[0]["results"]) == 5

        vsim_res_from_cursor = await decoded_r.ft().aggregate(query=vsim_cursor)
        if is_resp2_connection(decoded_r):
            assert len(vsim_res_from_cursor.rows) == 5
        else:
            assert len(vsim_res_from_cursor[0]["results"]) == 5
